simple_ota_example: Adds sha256 check for app images

pull/7041/head
KonstantinKondrashov 2021-04-27 18:56:02 +08:00
rodzic ca481e18e1
commit 2e4b625f59
3 zmienionych plików z 91 dodań i 1 usunięć

Wyświetl plik

@ -7,7 +7,7 @@ import sys
from threading import Thread
import ttfw_idf
from tiny_test_fw import DUT
from tiny_test_fw import DUT, Utility
server_cert = '-----BEGIN CERTIFICATE-----\n' \
'MIIDXTCCAkWgAwIBAgIJAP4LF7E72HakMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV\n'\
@ -92,6 +92,29 @@ def start_https_server(ota_image_dir, server_ip, server_port, server_file=None,
httpd.serve_forever()
def check_sha256(sha256_expected, sha256_reported):
Utility.console_log('sha256_expected: %s' % (sha256_expected))
Utility.console_log('sha256_reported: %s' % (sha256_reported))
if sha256_reported not in sha256_expected:
raise ValueError('SHA256 mismatch')
else:
Utility.console_log('SHA256 expected and reported are the same')
def calc_all_sha256(dut):
bootloader_path = os.path.join(dut.app.binary_path, 'bootloader', 'bootloader.bin')
output = dut.image_info(bootloader_path)
sha256_bootloader = re.search(r'Validation Hash:\s+([a-f0-9]+)', output).group(1)
Utility.console_log('bootloader SHA256: %s' % sha256_bootloader)
app_path = os.path.join(dut.app.binary_path, 'simple_ota.bin')
output = dut.image_info(app_path)
sha256_app = re.search(r'Validation Hash:\s+([a-f0-9]+)', output).group(1)
Utility.console_log('app SHA256: %s' % sha256_app)
return sha256_bootloader, sha256_app
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_OTA')
def test_examples_protocol_simple_ota_example(env, extra_data):
"""
@ -105,6 +128,7 @@ def test_examples_protocol_simple_ota_example(env, extra_data):
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
sha256_bootloader, sha256_app = calc_all_sha256(dut1)
# start test
host_ip = get_my_ip()
thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
@ -112,6 +136,8 @@ def test_examples_protocol_simple_ota_example(env, extra_data):
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x10000', timeout=30)
check_sha256(sha256_bootloader, dut1.expect(re.compile(r'SHA-256 for bootloader:\s+([a-f0-9]+)'))[0])
check_sha256(sha256_app, dut1.expect(re.compile(r'SHA-256 for current firmware:\s+([a-f0-9]+)'))[0])
try:
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
@ -248,6 +274,7 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
sha256_bootloader, sha256_app = calc_all_sha256(dut1)
# start test
host_ip = get_my_ip()
thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
@ -255,6 +282,8 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x20000', timeout=30)
check_sha256(sha256_bootloader, dut1.expect(re.compile(r'SHA-256 for bootloader:\s+([a-f0-9]+)'))[0])
check_sha256(sha256_app, dut1.expect(re.compile(r'SHA-256 for current firmware:\s+([a-f0-9]+)'))[0])
try:
ip_address = dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
@ -286,6 +315,7 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
sha256_bootloader, sha256_app = calc_all_sha256(dut1)
# start test
host_ip = get_my_ip()
thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
@ -293,6 +323,8 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x20000', timeout=30)
check_sha256(sha256_bootloader, dut1.expect(re.compile(r'SHA-256 for bootloader:\s+([a-f0-9]+)'))[0])
check_sha256(sha256_app, dut1.expect(re.compile(r'SHA-256 for current firmware:\s+([a-f0-9]+)'))[0])
try:
ip_address = dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))

Wyświetl plik

@ -25,6 +25,8 @@
#include "esp_wifi.h"
#endif
#define HASH_LEN 32
#ifdef CONFIG_EXAMPLE_FIRMWARE_UPGRADE_BIND_IF
/* The interface name value can refer to if_desc in esp_netif_defaults.h */
#if CONFIG_EXAMPLE_FIRMWARE_UPGRADE_BIND_IF_ETH
@ -120,6 +122,33 @@ void simple_ota_example_task(void *pvParameter)
}
}
static void print_sha256(const uint8_t *image_hash, const char *label)
{
char hash_print[HASH_LEN * 2 + 1];
hash_print[HASH_LEN * 2] = 0;
for (int i = 0; i < HASH_LEN; ++i) {
sprintf(&hash_print[i * 2], "%02x", image_hash[i]);
}
ESP_LOGI(TAG, "%s %s", label, hash_print);
}
static void get_sha256_of_partitions(void)
{
uint8_t sha_256[HASH_LEN] = { 0 };
esp_partition_t partition;
// get sha256 digest for bootloader
partition.address = ESP_BOOTLOADER_OFFSET;
partition.size = ESP_PARTITION_TABLE_OFFSET;
partition.type = ESP_PARTITION_TYPE_APP;
esp_partition_get_sha256(&partition, sha_256);
print_sha256(sha_256, "SHA-256 for bootloader: ");
// get sha256 digest for running partition
esp_partition_get_sha256(esp_ota_get_running_partition(), sha_256);
print_sha256(sha_256, "SHA-256 for current firmware: ");
}
void app_main(void)
{
// Initialize NVS.
@ -134,6 +163,8 @@ void app_main(void)
}
ESP_ERROR_CHECK(err);
get_sha256_of_partitions();
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());

Wyświetl plik

@ -14,6 +14,7 @@
""" DUT for IDF applications """
import functools
import io
import os
import os.path
import re
@ -309,6 +310,32 @@ class IDFDUT(DUT.SerialDUT):
else:
raise last_error
def image_info(self, path_to_file):
"""
get hash256 of app
:param: path: path to file
:return: sha256 appended to app
"""
old_stdout = sys.stdout
new_stdout = io.StringIO()
sys.stdout = new_stdout
class Args(object):
def __init__(self, attributes):
for key, value in attributes.items():
self.__setattr__(key, value)
args = Args({
'chip': self.TARGET,
'filename': path_to_file,
})
esptool.image_info(args)
output = new_stdout.getvalue()
sys.stdout = old_stdout
return output
@_uses_esptool
def reset(self, esp):
"""