Merge branch 'feature/mqtt_intro_new_modes' into 'master'

MQTT:  Update submodule reference to support new config modes

See merge request espressif/esp-idf!11636
pull/6416/head
David Čermák 2021-01-06 06:25:54 +08:00
commit b5db978499
7 zmienionych plików z 218 dodań i 39 usunięć

Wyświetl plik

@ -26,6 +26,30 @@ menu "ESP-MQTT Configurations"
help
Enable MQTT transport over Websocket Secure.
config MQTT_MSG_ID_INCREMENTAL
bool "Use Incremental Message Id"
default n
help
Set this to true for the message id (2.3.1 Packet Identifier) to be generated
as an incremental number rather then a random value (used by default)
config MQTT_SKIP_PUBLISH_IF_DISCONNECTED
bool "Skip publish if disconnected"
default n
help
Set this to true to avoid publishing (enqueueing messages) if the client is disconnected.
The MQTT client tries to publish all messages by default, even in the disconnected state
(where the qos1 and qos2 packets are stored in the internal outbox to be published later)
The MQTT_SKIP_PUBLISH_IF_DISCONNECTED option allows applications to override this behaviour
and not enqueue publish packets in the disconnected state.
config MQTT_REPORT_DELETED_MESSAGES
bool "Report deleted messages"
default n
help
Set this to true to post events for all messages which were deleted from the outbox
before being correctly sent and confirmed.
config MQTT_USE_CUSTOM_CONFIG
bool "MQTT Using custom configurations"
default n

@ -1 +1 @@
Subproject commit da850b0add1e71b3659bfac5d797cc834dc3e89b
Subproject commit 9ea804e0ab5368d5ab53ae2301a5fec9d1f12f1a

Wyświetl plik

@ -56,7 +56,7 @@ def on_message(client, userdata, msg):
message_log += "Received data:" + msg.topic + " " + payload + "\n"
def test_single_config(dut, transport, qos, repeat, published):
def test_single_config(dut, transport, qos, repeat, published, queue=0):
global expected_count
global expected_data
global message_log
@ -65,7 +65,7 @@ def test_single_config(dut, transport, qos, repeat, published):
expected_count = 0
message_log = ""
expected_data = sample_string * repeat
print("PUBLISH TEST: transport:{}, qos:{}, sequence:{}, sample msg:'{}'".format(transport, qos, published, expected_data))
print("PUBLISH TEST: transport:{}, qos:{}, sequence:{}, enqueue:{}, sample msg:'{}'".format(transport, qos, published, queue, expected_data))
client = None
try:
if transport in ["ws", "wss"]:
@ -89,7 +89,7 @@ def test_single_config(dut, transport, qos, repeat, published):
if not event_client_connected.wait(timeout=30):
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_host[transport]))
client.subscribe(subscribe_topic, qos)
dut.write("{} {} {} {} {}".format(transport, sample_string, repeat, published, qos), eol="\n")
dut.write(' '.join(str(x) for x in (transport, sample_string, repeat, published, qos, queue)), eol="\n")
try:
# waiting till subscribed to defined topic
dut.expect(re.compile(r"MQTT_EVENT_SUBSCRIBED"), timeout=30)
@ -105,7 +105,7 @@ def test_single_config(dut, transport, qos, repeat, published):
if expected_count == published or (expected_count > published and qos == 1):
print("All data received from ESP32...")
else:
raise ValueError("Not all data received from ESP32: Expected:{}x{}, Received:{}x{}".format(expected_data, published, message_log, expected_count))
raise ValueError("Not all data received from ESP32: Expected:{}x{}, Received:{}x{}".format(expected_count, published, expected_data, message_log))
finally:
event_stop_client.set()
thread1.join()
@ -149,29 +149,30 @@ def test_weekend_mqtt_publish(env, extra_data):
raise
for qos in [0, 1, 2]:
for transport in ["tcp", "ssl", "ws", "wss"]:
if broker_host[transport] is None:
print('Skipping transport: {}...'.format(transport))
continue
# simple test with empty message
test_single_config(dut1, transport, qos, 0, 5)
# decide on broker what level of test will pass (local broker works the best)
if broker_host[transport].startswith("192.168") and qos < 1:
# medium size, medium repeated
test_single_config(dut1, transport, qos, 5, 50)
# long data
test_single_config(dut1, transport, qos, 1000, 10)
# short data, many repeats
test_single_config(dut1, transport, qos, 2, 200)
elif transport in ["ws", "wss"]:
# more relaxed criteria for websockets!
test_single_config(dut1, transport, qos, 2, 5)
test_single_config(dut1, transport, qos, 50, 1)
test_single_config(dut1, transport, qos, 10, 20)
else:
# common configuration should be good for most public mosquittos
test_single_config(dut1, transport, qos, 5, 10)
test_single_config(dut1, transport, qos, 500, 3)
test_single_config(dut1, transport, qos, 1, 50)
for q in [0, 1]:
if broker_host[transport] is None:
print('Skipping transport: {}...'.format(transport))
continue
# simple test with empty message
test_single_config(dut1, transport, qos, 0, 5, q)
# decide on broker what level of test will pass (local broker works the best)
if broker_host[transport].startswith("192.168") and qos > 0 and q == 0:
# medium size, medium repeated
test_single_config(dut1, transport, qos, 5, 50, q)
# long data
test_single_config(dut1, transport, qos, 1000, 10, q)
# short data, many repeats
test_single_config(dut1, transport, qos, 2, 200, q)
elif transport in ["ws", "wss"]:
# more relaxed criteria for websockets!
test_single_config(dut1, transport, qos, 2, 5, q)
test_single_config(dut1, transport, qos, 50, 1, q)
test_single_config(dut1, transport, qos, 10, 20, q)
else:
# common configuration should be good for most public mosquittos
test_single_config(dut1, transport, qos, 5, 10, q)
test_single_config(dut1, transport, qos, 500, 3, q)
test_single_config(dut1, transport, qos, 1, 50, q)
if __name__ == '__main__':

Wyświetl plik

@ -28,6 +28,7 @@
#include "esp_log.h"
#include "mqtt_client.h"
#include "rsa_sign_alt.h"
/* pre_prov - name of partition containing encrypted prv key parameters ( It is set as such to synchronize it with the pre provisioning service */
#define NVS_PARTITION_NAME "pre_prov"

Wyświetl plik

@ -8,6 +8,11 @@ import subprocess
from threading import Thread, Event
import ttfw_idf
import ssl
import paho.mqtt.client as mqtt
import string
import random
DEFAULT_MSG_SIZE = 16
def _path(f):
@ -37,6 +42,109 @@ def get_my_ip():
return IP
# Publisher class creating a python client to send/receive published data from esp-mqtt client
class MqttPublisher:
def __init__(self, dut, transport, qos, repeat, published, queue, publish_cfg, log_details=False):
# instance variables used as parameters of the publish test
self.event_stop_client = Event()
self.sample_string = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE))
self.client = None
self.dut = dut
self.log_details = log_details
self.repeat = repeat
self.publish_cfg = publish_cfg
self.publish_cfg["qos"] = qos
self.publish_cfg["queue"] = queue
self.publish_cfg["transport"] = transport
# static variables used to pass options to and from static callbacks of paho-mqtt client
MqttPublisher.event_client_connected = Event()
MqttPublisher.event_client_got_all = Event()
MqttPublisher.published = published
MqttPublisher.event_client_connected.clear()
MqttPublisher.event_client_got_all.clear()
MqttPublisher.expected_data = self.sample_string * self.repeat
def print_details(self, text):
if self.log_details:
print(text)
def mqtt_client_task(self, client):
while not self.event_stop_client.is_set():
client.loop()
# The callback for when the client receives a CONNACK response from the server (needs to be static)
@staticmethod
def on_connect(_client, _userdata, _flags, _rc):
MqttPublisher.event_client_connected.set()
# The callback for when a PUBLISH message is received from the server (needs to be static)
@staticmethod
def on_message(client, userdata, msg):
payload = msg.payload.decode()
if payload == MqttPublisher.expected_data:
userdata += 1
client.user_data_set(userdata)
if userdata == MqttPublisher.published:
MqttPublisher.event_client_got_all.set()
def __enter__(self):
qos = self.publish_cfg["qos"]
queue = self.publish_cfg["queue"]
transport = self.publish_cfg["transport"]
broker_host = self.publish_cfg["broker_host_" + transport]
broker_port = self.publish_cfg["broker_port_" + transport]
# Start the test
self.print_details("PUBLISH TEST: transport:{}, qos:{}, sequence:{}, enqueue:{}, sample msg:'{}'"
.format(transport, qos, MqttPublisher.published, queue, MqttPublisher.expected_data))
try:
if transport in ["ws", "wss"]:
self.client = mqtt.Client(transport="websockets")
else:
self.client = mqtt.Client()
self.client.on_connect = MqttPublisher.on_connect
self.client.on_message = MqttPublisher.on_message
self.client.user_data_set(0)
if transport in ["ssl", "wss"]:
self.client.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
self.client.tls_insecure_set(True)
self.print_details("Connecting...")
self.client.connect(broker_host, broker_port, 60)
except Exception:
self.print_details("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}".format(broker_host))
raise
# Starting a py-client in a separate thread
thread1 = Thread(target=self.mqtt_client_task, args=(self.client,))
thread1.start()
self.print_details("Connecting py-client to broker {}:{}...".format(broker_host, broker_port))
if not MqttPublisher.event_client_connected.wait(timeout=30):
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_host))
self.client.subscribe(self.publish_cfg["subscribe_topic"], qos)
self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol="\n")
try:
# waiting till subscribed to defined topic
self.dut.expect(re.compile(r"MQTT_EVENT_SUBSCRIBED"), timeout=30)
for _ in range(MqttPublisher.published):
self.client.publish(self.publish_cfg["publish_topic"], self.sample_string * self.repeat, qos)
self.print_details("Publishing...")
self.print_details("Checking esp-client received msg published from py-client...")
self.dut.expect(re.compile(r"Correct pattern received exactly x times"), timeout=60)
if not MqttPublisher.event_client_got_all.wait(timeout=60):
raise ValueError("Not all data received from ESP32")
print(" - all data received from ESP32")
finally:
self.event_stop_client.set()
thread1.join()
def __exit__(self, exc_type, exc_value, traceback):
self.client.disconnect()
self.event_stop_client.clear()
# Simple server for mqtt over TLS connection
class TlsServer:
@ -143,9 +251,18 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
binary_file = os.path.join(dut1.app.binary_path, "mqtt_publish_connect_test.bin")
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("mqtt_publish_connect_test_bin_size", "{}KB".format(bin_size // 1024))
# Look for test case symbolic names
# Look for test case symbolic names and publish configs
cases = {}
publish_cfg = {}
try:
def get_host_port_from_dut(dut1, config_option):
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()[config_option])
if value is None:
return None, None
return value.group(1), int(value.group(2))
# Get connection test cases configuration: symbolic names for test cases
for i in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT",
"CONFIG_EXAMPLE_CONNECT_CASE_SERVER_CERT",
"CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH",
@ -155,6 +272,14 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
"CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT",
"CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN"]:
cases[i] = dut1.app.get_sdkconfig()[i]
# Get publish test configuration
publish_cfg["publish_topic"] = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_SUBSCIBE_TOPIC"].replace('"','')
publish_cfg["subscribe_topic"] = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_PUBLISH_TOPIC"].replace('"','')
publish_cfg["broker_host_ssl"], publish_cfg["broker_port_ssl"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_SSL_URI")
publish_cfg["broker_host_tcp"], publish_cfg["broker_port_tcp"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_TCP_URI")
publish_cfg["broker_host_ws"], publish_cfg["broker_port_ws"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WS_URI")
publish_cfg["broker_host_wss"], publish_cfg["broker_port_wss"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WSS_URI")
except Exception:
print('ENV_TEST_FAILURE: Some mandatory test case not found in sdkconfig')
raise
@ -162,13 +287,14 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
dut1.start_app()
esp_ip = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)
print("Got IP={}".format(esp_ip[0]))
#
# start connection test
ip = get_my_ip()
set_server_cert_cn(ip)
server_port = 2222
def start_case(case, desc):
def start_connection_case(case, desc):
print("Starting {}: {}".format(case, desc))
case_id = cases[case]
dut1.write("conn {} {} {}".format(ip, server_port, case_id))
@ -178,14 +304,14 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
for case in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_SERVER_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_SERVER_DER_CERT"]:
# All these cases connect to the server with no server verification or with server only verification
with TlsServer(server_port):
test_nr = start_case(case, "default server - expect to connect normally")
test_nr = start_connection_case(case, "default server - expect to connect normally")
dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
with TlsServer(server_port, refuse_connection=True):
test_nr = start_case(case, "ssl shall connect, but mqtt sends connect refusal")
test_nr = start_connection_case(case, "ssl shall connect, but mqtt sends connect refusal")
dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
dut1.expect("MQTT ERROR: 0x5") # expecting 0x5 ... connection not authorized error
with TlsServer(server_port, client_cert=True) as s:
test_nr = start_case(case, "server with client verification - handshake error since client presents no client certificate")
test_nr = start_connection_case(case, "server with client verification - handshake error since client presents no client certificate")
dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
dut1.expect("ESP-TLS ERROR: 0x8010") # expect ... handshake error (PEER_DID_NOT_RETURN_A_CERTIFICATE)
if "PEER_DID_NOT_RETURN_A_CERTIFICATE" not in s.get_last_ssl_error():
@ -194,12 +320,12 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
for case in ["CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH", "CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD"]:
# These cases connect to server with both server and client verification (client key might be password protected)
with TlsServer(server_port, client_cert=True):
test_nr = start_case(case, "server with client verification - expect to connect normally")
test_nr = start_connection_case(case, "server with client verification - expect to connect normally")
dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
case = "CONFIG_EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT"
with TlsServer(server_port) as s:
test_nr = start_case(case, "invalid server certificate on default server - expect ssl handshake error")
test_nr = start_connection_case(case, "invalid server certificate on default server - expect ssl handshake error")
dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
dut1.expect("ESP-TLS ERROR: 0x8010") # expect ... handshake error (TLSV1_ALERT_UNKNOWN_CA)
if "alert unknown ca" not in s.get_last_ssl_error():
@ -207,7 +333,7 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
case = "CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT"
with TlsServer(server_port, client_cert=True) as s:
test_nr = start_case(case, "Invalid client certificate on server with client verification - expect ssl handshake error")
test_nr = start_connection_case(case, "Invalid client certificate on server with client verification - expect ssl handshake error")
dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
dut1.expect("ESP-TLS ERROR: 0x8010") # expect ... handshake error (CERTIFICATE_VERIFY_FAILED)
if "CERTIFICATE_VERIFY_FAILED" not in s.get_last_ssl_error():
@ -215,7 +341,7 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
for case in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN"]:
with TlsServer(server_port, use_alpn=True) as s:
test_nr = start_case(case, "server with alpn - expect connect, check resolved protocol")
test_nr = start_connection_case(case, "server with alpn - expect connect, check resolved protocol")
dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
if case == "CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT" and s.get_negotiated_protocol() is None:
print(" - client with alpn off, no negotiated protocol: OK")
@ -224,6 +350,25 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
else:
raise Exception("Unexpected negotiated protocol {}".format(s.get_negotiated_protocol()))
#
# start publish tests
def start_publish_case(transport, qos, repeat, published, queue):
print("Starting Publish test: transport:{}, qos:{}, nr_of_msgs:{}, msg_size:{}, enqueue:{}"
.format(transport, qos, published, repeat * DEFAULT_MSG_SIZE, queue))
with MqttPublisher(dut1, transport, qos, repeat, published, queue, publish_cfg):
pass
for qos in [0, 1, 2]:
for transport in ["tcp", "ssl", "ws", "wss"]:
for q in [0, 1]:
if publish_cfg["broker_host_" + transport] is None:
print('Skipping transport: {}...'.format(transport))
continue
start_publish_case(transport, qos, 0, 5, q)
start_publish_case(transport, qos, 2, 5, q)
start_publish_case(transport, qos, 50, 1, q)
start_publish_case(transport, qos, 10, 20, q)
if __name__ == '__main__':
test_app_protocol_mqtt_publish_connect()

Wyświetl plik

@ -126,13 +126,14 @@ void publish_test(const char* line)
char pattern[32];
char transport[32];
int repeat = 0;
int enqueue = 0;
static bool is_mqtt_init = false;
if (!is_mqtt_init) {
mqtt_app_start();
is_mqtt_init = true;
}
sscanf(line, "%s %s %d %d %d", transport, pattern, &repeat, &expected_published, &qos_test);
sscanf(line, "%s %s %d %d %d %d", transport, pattern, &repeat, &expected_published, &qos_test, &enqueue);
ESP_LOGI(TAG, "PATTERN:%s REPEATED:%d PUBLISHED:%d\n", pattern, repeat, expected_published);
int pattern_size = strlen(pattern);
free(expected_data);
@ -169,7 +170,12 @@ void publish_test(const char* line)
xEventGroupWaitBits(mqtt_event_group, CONNECTED_BIT, false, true, portMAX_DELAY);
for (int i = 0; i < expected_published; i++) {
int msg_id = esp_mqtt_client_publish(mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, expected_data, expected_size, qos_test, 0);
int msg_id;
if (enqueue) {
msg_id = esp_mqtt_client_enqueue(mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, expected_data, expected_size, qos_test, 0, true);
} else {
msg_id = esp_mqtt_client_publish(mqtt_client, CONFIG_EXAMPLE_PUBLISH_TOPIC, expected_data, expected_size, qos_test, 0);
}
ESP_LOGI(TAG, "[%d] Publishing...", msg_id);
}
}

Wyświetl plik

@ -7,3 +7,5 @@ CONFIG_MBEDTLS_ASYMMETRIC_CONTENT_LEN=y
CONFIG_MBEDTLS_SSL_IN_CONTENT_LEN=16384
CONFIG_MBEDTLS_SSL_OUT_CONTENT_LEN=16384
CONFIG_ESP_NETIF_TCPIP_ADAPTER_COMPATIBLE_LAYER=n
CONFIG_ESP_TLS_INSECURE=y
CONFIG_ESP_TLS_SKIP_SERVER_CERT_VERIFY=y