diff --git a/examples/bluetooth/nimble/blecent/blecent_test.py b/examples/bluetooth/nimble/blecent/blecent_test.py index b409bb981a..2e09579b52 100644 --- a/examples/bluetooth/nimble/blecent/blecent_test.py +++ b/examples/bluetooth/nimble/blecent/blecent_test.py @@ -15,63 +15,39 @@ # limitations under the License. from __future__ import print_function -import os -import re -import uuid -import subprocess -from tiny_test_fw import Utility +import os +import subprocess +import threading +import traceback + +try: + import Queue +except ImportError: + import queue as Queue + import ttfw_idf from ble import lib_ble_client +from tiny_test_fw import Utility # When running on local machine execute the following before running this script # > make app bootloader # > make print_flash_cmd | tail -n 1 > build/download.config -@ttfw_idf.idf_example_test(env_tag="Example_WIFI_BT") -def test_example_app_ble_central(env, extra_data): - """ - Steps: - 1. Discover Bluetooth Adapter and Power On - """ - +def blecent_client_task(dut): interface = 'hci0' - adv_host_name = "BleCentTestApp" - adv_iface_index = 0 + adv_host_name = 'BleCentTestApp' adv_type = 'peripheral' adv_uuid = '1811' - subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*']) - subprocess.check_output(['hciconfig','hci0','reset']) - # Acquire DUT - dut = env.get_dut("blecent", "examples/bluetooth/nimble/blecent", dut_class=ttfw_idf.ESP32DUT) - - # Get binary file - binary_file = os.path.join(dut.app.binary_path, "blecent.bin") - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance("blecent_bin_size", "{}KB".format(bin_size // 1024)) - - # Upload binary and start testing - Utility.console_log("Starting blecent example test app") - dut.start_app() - dut.reset() - - device_addr = ':'.join(re.findall('..', '%012x' % uuid.getnode())) - # Get BLE client module - ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface) - if not ble_client_obj: - raise RuntimeError("Get DBus-Bluez object failed !!") + ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface) # Discover Bluetooth Adapter and power on is_adapter_set = ble_client_obj.set_adapter() if not is_adapter_set: - raise RuntimeError("Adapter Power On failed !!") - - # Write device address to dut - dut.expect("BLE Host Task Started", timeout=60) - dut.write(device_addr + "\n") + return ''' Blecent application run: @@ -81,28 +57,83 @@ def test_example_app_ble_central(env, extra_data): Register advertisement Start advertising ''' - ble_client_obj.start_advertising(adv_host_name, adv_iface_index, adv_type, adv_uuid) + # Create Gatt Application + # Register GATT Application + ble_client_obj.register_gatt_app() - # Call disconnect to perform cleanup operations before exiting application - ble_client_obj.disconnect() + # Register Advertisement + # Check read/write/subscribe is received from device + ble_client_obj.register_adv(adv_host_name, adv_type, adv_uuid) # Check dut responses - dut.expect("Connection established", timeout=60) + dut.expect('Connection established', timeout=30) + dut.expect('Service discovery complete; status=0', timeout=30) + dut.expect('GATT procedure initiated: read;', timeout=30) + dut.expect('Read complete; status=0', timeout=30) + dut.expect('GATT procedure initiated: write;', timeout=30) + dut.expect('Write complete; status=0', timeout=30) + dut.expect('GATT procedure initiated: write;', timeout=30) + dut.expect('Subscribe complete; status=0', timeout=30) + dut.expect('received notification;', timeout=30) - dut.expect("Service discovery complete; status=0", timeout=60) - print("Service discovery passed\n\tService Discovery Status: 0") - dut.expect("GATT procedure initiated: read;", timeout=60) - dut.expect("Read complete; status=0", timeout=60) - print("Read passed\n\tSupportedNewAlertCategoryCharacteristic\n\tRead Status: 0") +class BleCentThread(threading.Thread): + def __init__(self, dut, exceptions_queue): + threading.Thread.__init__(self) + self.dut = dut + self.exceptions_queue = exceptions_queue - dut.expect("GATT procedure initiated: write;", timeout=60) - dut.expect("Write complete; status=0", timeout=60) - print("Write passed\n\tAlertNotificationControlPointCharacteristic\n\tWrite Status: 0") + def run(self): + try: + blecent_client_task(self.dut) + except RuntimeError: + self.exceptions_queue.put(traceback.format_exc(), block=False) - dut.expect("GATT procedure initiated: write;", timeout=60) - dut.expect("Subscribe complete; status=0", timeout=60) - print("Subscribe passed\n\tClientCharacteristicConfigurationDescriptor\n\tSubscribe Status: 0") + +@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT') +def test_example_app_ble_central(env, extra_data): + """ + Steps: + 1. Discover Bluetooth Adapter and Power On + 2. Connect BLE Device + 3. Start Notifications + 4. Updated value is retrieved + 5. Stop Notifications + """ + # Remove cached bluetooth devices of any previous connections + subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*']) + subprocess.check_output(['hciconfig', 'hci0', 'reset']) + + # Acquire DUT + dut = env.get_dut('blecent', 'examples/bluetooth/nimble/blecent', dut_class=ttfw_idf.ESP32DUT) + + # Get binary file + binary_file = os.path.join(dut.app.binary_path, 'blecent.bin') + bin_size = os.path.getsize(binary_file) + ttfw_idf.log_performance('blecent_bin_size', '{}KB'.format(bin_size // 1024)) + + # Upload binary and start testing + Utility.console_log('Starting blecent example test app') + dut.start_app() + dut.reset() + + exceptions_queue = Queue.Queue() + # Starting a py-client in a separate thread + blehr_thread_obj = BleCentThread(dut, exceptions_queue) + blehr_thread_obj.start() + blehr_thread_obj.join() + + exception_msg = None + while True: + try: + exception_msg = exceptions_queue.get(block=False) + except Queue.Empty: + break + else: + Utility.console_log('\n' + exception_msg) + + if exception_msg: + raise Exception('Blecent thread did not run successfully') if __name__ == '__main__': diff --git a/examples/bluetooth/nimble/blehr/blehr_test.py b/examples/bluetooth/nimble/blehr/blehr_test.py index 223f57dbc4..41c36fcf00 100644 --- a/examples/bluetooth/nimble/blehr/blehr_test.py +++ b/examples/bluetooth/nimble/blehr/blehr_test.py @@ -15,57 +15,56 @@ # limitations under the License. from __future__ import print_function + import os import re +import subprocess import threading import traceback -import subprocess - -from tiny_test_fw import Utility -import ttfw_idf -from ble import lib_ble_client try: import Queue except ImportError: import queue as Queue +import ttfw_idf +from ble import lib_ble_client +from tiny_test_fw import Utility + # When running on local machine execute the following before running this script # > make app bootloader # > make print_flash_cmd | tail -n 1 > build/download.config -def blehr_client_task(hr_obj, dut_addr): +def blehr_client_task(hr_obj, dut, dut_addr): interface = 'hci0' ble_devname = 'blehr_sensor_1.0' hr_srv_uuid = '180d' hr_char_uuid = '2a37' # Get BLE client module - ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface, devname=ble_devname, devaddr=dut_addr) - if not ble_client_obj: - raise RuntimeError("Failed to get DBus-Bluez object") + ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface) # Discover Bluetooth Adapter and power on is_adapter_set = ble_client_obj.set_adapter() if not is_adapter_set: - raise RuntimeError("Adapter Power On failed !!") + return # Connect BLE Device - is_connected = ble_client_obj.connect() + is_connected = ble_client_obj.connect( + devname=ble_devname, + devaddr=dut_addr) if not is_connected: - # Call disconnect to perform cleanup operations before exiting application - ble_client_obj.disconnect() - raise RuntimeError("Connection to device " + str(ble_devname) + " failed !!") + return - # Read Services - services_ret = ble_client_obj.get_services() - if services_ret: - Utility.console_log("\nServices\n") - Utility.console_log(str(services_ret)) - else: + # Get services of the connected device + services = ble_client_obj.get_services() + if not services: ble_client_obj.disconnect() - raise RuntimeError("Failure: Read Services failed") + return + + # Get characteristics of the connected device + ble_client_obj.get_chars() ''' Blehr application run: @@ -73,30 +72,52 @@ def blehr_client_task(hr_obj, dut_addr): Retrieve updated value Stop Notifications ''' - blehr_ret = ble_client_obj.hr_update_simulation(hr_srv_uuid, hr_char_uuid) - if blehr_ret: - Utility.console_log("Success: blehr example test passed") + # Get service if exists + service = ble_client_obj.get_service_if_exists(hr_srv_uuid) + if service: + # Get characteristic if exists + char = ble_client_obj.get_char_if_exists(hr_char_uuid) + if not char: + ble_client_obj.disconnect() + return else: - raise RuntimeError("Failure: blehr example test failed") + ble_client_obj.disconnect() + return + + # Start Notify + # Read updated value + # Stop Notify + notify = ble_client_obj.start_notify(char) + if not notify: + ble_client_obj.disconnect() + return + + # Check dut responses + dut.expect('subscribe event; cur_notify=1', timeout=30) + dut.expect('subscribe event; cur_notify=0', timeout=30) # Call disconnect to perform cleanup operations before exiting application ble_client_obj.disconnect() + # Check dut responses + dut.expect('disconnect;', timeout=30) + class BleHRThread(threading.Thread): - def __init__(self, dut_addr, exceptions_queue): + def __init__(self, dut, dut_addr, exceptions_queue): threading.Thread.__init__(self) + self.dut = dut self.dut_addr = dut_addr self.exceptions_queue = exceptions_queue def run(self): try: - blehr_client_task(self, self.dut_addr) - except Exception: + blehr_client_task(self, self.dut, self.dut_addr) + except RuntimeError: self.exceptions_queue.put(traceback.format_exc(), block=False) -@ttfw_idf.idf_example_test(env_tag="Example_WIFI_BT") +@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT') def test_example_app_ble_hr(env, extra_data): """ Steps: @@ -106,28 +127,28 @@ def test_example_app_ble_hr(env, extra_data): 4. Updated value is retrieved 5. Stop Notifications """ - subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*']) - subprocess.check_output(['hciconfig','hci0','reset']) + # Remove cached bluetooth devices of any previous connections + subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*']) + subprocess.check_output(['hciconfig', 'hci0', 'reset']) # Acquire DUT - dut = env.get_dut("blehr", "examples/bluetooth/nimble/blehr", dut_class=ttfw_idf.ESP32DUT) + dut = env.get_dut('blehr', 'examples/bluetooth/nimble/blehr', dut_class=ttfw_idf.ESP32DUT) # Get binary file - binary_file = os.path.join(dut.app.binary_path, "blehr.bin") + binary_file = os.path.join(dut.app.binary_path, 'blehr.bin') bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance("blehr_bin_size", "{}KB".format(bin_size // 1024)) - ttfw_idf.check_performance("blehr_bin_size", bin_size // 1024) + ttfw_idf.log_performance('blehr_bin_size', '{}KB'.format(bin_size // 1024)) # Upload binary and start testing - Utility.console_log("Starting blehr simple example test app") + Utility.console_log('Starting blehr simple example test app') dut.start_app() dut.reset() # Get device address from dut - dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0] + dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0] exceptions_queue = Queue.Queue() # Starting a py-client in a separate thread - blehr_thread_obj = BleHRThread(dut_addr, exceptions_queue) + blehr_thread_obj = BleHRThread(dut, dut_addr, exceptions_queue) blehr_thread_obj.start() blehr_thread_obj.join() @@ -138,15 +159,10 @@ def test_example_app_ble_hr(env, extra_data): except Queue.Empty: break else: - Utility.console_log("\n" + exception_msg) + Utility.console_log('\n' + exception_msg) if exception_msg: - raise Exception("Thread did not run successfully") - - # Check dut responses - dut.expect("subscribe event; cur_notify=1", timeout=30) - dut.expect("subscribe event; cur_notify=0", timeout=30) - dut.expect("disconnect;", timeout=30) + raise Exception('Blehr thread did not run successfully') if __name__ == '__main__': diff --git a/examples/bluetooth/nimble/bleprph/bleprph_test.py b/examples/bluetooth/nimble/bleprph/bleprph_test.py index 2ae2289871..978c1d004a 100644 --- a/examples/bluetooth/nimble/bleprph/bleprph_test.py +++ b/examples/bluetooth/nimble/bleprph/bleprph_test.py @@ -15,20 +15,21 @@ # limitations under the License. from __future__ import print_function + import os import re -import traceback -import threading import subprocess +import threading +import traceback try: import Queue except ImportError: import queue as Queue -from tiny_test_fw import Utility import ttfw_idf from ble import lib_ble_client +from tiny_test_fw import Utility # When running on local machine execute the following before running this script # > make app bootloader @@ -36,73 +37,57 @@ from ble import lib_ble_client # > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw -def bleprph_client_task(prph_obj, dut, dut_addr): +def bleprph_client_task(dut, dut_addr): interface = 'hci0' ble_devname = 'nimble-bleprph' srv_uuid = '2f12' + write_value = b'A' # Get BLE client module - ble_client_obj = lib_ble_client.BLE_Bluez_Client(interface, devname=ble_devname, devaddr=dut_addr) - if not ble_client_obj: - raise RuntimeError("Failed to get DBus-Bluez object") + ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface) # Discover Bluetooth Adapter and power on is_adapter_set = ble_client_obj.set_adapter() if not is_adapter_set: - raise RuntimeError("Adapter Power On failed !!") + return # Connect BLE Device - is_connected = ble_client_obj.connect() + is_connected = ble_client_obj.connect( + devname=ble_devname, + devaddr=dut_addr) if not is_connected: - # Call disconnect to perform cleanup operations before exiting application + return + + # Get services of the connected device + services = ble_client_obj.get_services() + if not services: ble_client_obj.disconnect() - raise RuntimeError("Connection to device " + ble_devname + " failed !!") + return + # Verify service uuid exists + service_exists = ble_client_obj.get_service_if_exists(srv_uuid) + if not service_exists: + ble_client_obj.disconnect() + return + + # Get characteristics of the connected device + ble_client_obj.get_chars() + + # Read properties of characteristics (uuid, value and permissions) + ble_client_obj.read_chars() + + # Write new value to characteristic having read and write permission + # and display updated value + write_char = ble_client_obj.write_chars(write_value) + if not write_char: + ble_client_obj.disconnect() + return + + # Disconnect device + ble_client_obj.disconnect() # Check dut responses - dut.expect("GAP procedure initiated: advertise;", timeout=30) - - # Read Services - services_ret = ble_client_obj.get_services(srv_uuid) - if services_ret: - Utility.console_log("\nServices\n") - Utility.console_log(str(services_ret)) - else: - ble_client_obj.disconnect() - raise RuntimeError("Failure: Read Services failed") - - # Read Characteristics - chars_ret = {} - chars_ret = ble_client_obj.read_chars() - if chars_ret: - Utility.console_log("\nCharacteristics retrieved") - for path, props in chars_ret.items(): - Utility.console_log("\n\tCharacteristic: " + str(path)) - Utility.console_log("\tCharacteristic UUID: " + str(props[2])) - Utility.console_log("\tValue: " + str(props[0])) - Utility.console_log("\tProperties: : " + str(props[1])) - else: - ble_client_obj.disconnect() - raise RuntimeError("Failure: Read Characteristics failed") - - ''' - Write Characteristics - - write 'A' to characteristic with write permission - ''' - chars_ret_on_write = {} - chars_ret_on_write = ble_client_obj.write_chars(b'A') - if chars_ret_on_write: - Utility.console_log("\nCharacteristics after write operation") - for path, props in chars_ret_on_write.items(): - Utility.console_log("\n\tCharacteristic:" + str(path)) - Utility.console_log("\tCharacteristic UUID: " + str(props[2])) - Utility.console_log("\tValue:" + str(props[0])) - Utility.console_log("\tProperties: : " + str(props[1])) - else: - ble_client_obj.disconnect() - raise RuntimeError("Failure: Write Characteristics failed") - - # Call disconnect to perform cleanup operations before exiting application - ble_client_obj.disconnect() + dut.expect('connection established; status=0', timeout=30) + dut.expect('disconnect;', timeout=30) class BlePrphThread(threading.Thread): @@ -114,12 +99,12 @@ class BlePrphThread(threading.Thread): def run(self): try: - bleprph_client_task(self, self.dut, self.dut_addr) - except Exception: + bleprph_client_task(self.dut, self.dut_addr) + except RuntimeError: self.exceptions_queue.put(traceback.format_exc(), block=False) -@ttfw_idf.idf_example_test(env_tag="Example_WIFI_BT") +@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT') def test_example_app_ble_peripheral(env, extra_data): """ Steps: @@ -129,28 +114,31 @@ def test_example_app_ble_peripheral(env, extra_data): 4. Read Characteristics 5. Write Characteristics """ - subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*']) - subprocess.check_output(['hciconfig','hci0','reset']) + # Remove cached bluetooth devices of any previous connections + subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*']) + subprocess.check_output(['hciconfig', 'hci0', 'reset']) # Acquire DUT - dut = env.get_dut("bleprph", "examples/bluetooth/nimble/bleprph", dut_class=ttfw_idf.ESP32DUT) + dut = env.get_dut('bleprph', 'examples/bluetooth/nimble/bleprph', dut_class=ttfw_idf.ESP32DUT) # Get binary file - binary_file = os.path.join(dut.app.binary_path, "bleprph.bin") + binary_file = os.path.join(dut.app.binary_path, 'bleprph.bin') bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance("bleprph_bin_size", "{}KB".format(bin_size // 1024)) - ttfw_idf.check_performance("bleprph_bin_size", bin_size // 1024) + ttfw_idf.log_performance('bleprph_bin_size', '{}KB'.format(bin_size // 1024)) # Upload binary and start testing - Utility.console_log("Starting bleprph simple example test app") + Utility.console_log('Starting bleprph simple example test app') dut.start_app() dut.reset() # Get device address from dut - dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0] + dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0] + + # Check dut responses + dut.expect('GAP procedure initiated: advertise;', timeout=30) - exceptions_queue = Queue.Queue() # Starting a py-client in a separate thread + exceptions_queue = Queue.Queue() bleprph_thread_obj = BlePrphThread(dut, dut_addr, exceptions_queue) bleprph_thread_obj.start() bleprph_thread_obj.join() @@ -162,14 +150,10 @@ def test_example_app_ble_peripheral(env, extra_data): except Queue.Empty: break else: - Utility.console_log("\n" + exception_msg) + Utility.console_log('\n' + exception_msg) if exception_msg: - raise Exception("Thread did not run successfully") - - # Check dut responses - dut.expect("connection established; status=0", timeout=30) - dut.expect("disconnect;", timeout=30) + raise Exception('BlePrph thread did not run successfully') if __name__ == '__main__': diff --git a/examples/provisioning/softap_prov/softap_prov_test.py b/examples/provisioning/softap_prov/softap_prov_test.py index 8d74d9ad6d..8368b60ca7 100644 --- a/examples/provisioning/softap_prov/softap_prov_test.py +++ b/examples/provisioning/softap_prov/softap_prov_test.py @@ -15,89 +15,102 @@ # limitations under the License. from __future__ import print_function -import re -import os -import ttfw_idf +import os +import re + import esp_prov +import tiny_test_fw +import ttfw_idf import wifi_tools +from tiny_test_fw import Utility # Have esp_prov throw exception esp_prov.config_throw_except = True -@ttfw_idf.idf_example_test(env_tag="Example_WIFI_BT") +@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT') def test_examples_provisioning_softap(env, extra_data): # Acquire DUT - dut1 = env.get_dut("softap_prov", "examples/provisioning/softap_prov", dut_class=ttfw_idf.ESP32DUT) + dut1 = env.get_dut('softap_prov', 'examples/provisioning/softap_prov', dut_class=ttfw_idf.ESP32DUT) # Get binary file - binary_file = os.path.join(dut1.app.binary_path, "softap_prov.bin") + binary_file = os.path.join(dut1.app.binary_path, 'softap_prov.bin') bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance("softap_prov_bin_size", "{}KB".format(bin_size // 1024)) - ttfw_idf.check_performance("softap_prov_bin_size", bin_size // 1024) + ttfw_idf.log_performance('softap_prov_bin_size', '{}KB'.format(bin_size // 1024)) # Upload binary and start testing dut1.start_app() # Parse IP address of STA - dut1.expect("Starting WiFi SoftAP provisioning", timeout=60) + dut1.expect('Starting WiFi SoftAP provisioning', timeout=60) [ssid, password] = dut1.expect(re.compile(r"SoftAP Provisioning started with SSID '(\S+)', Password '(\S+)'"), timeout=30) iface = wifi_tools.get_wiface_name() if iface is None: - raise RuntimeError("Failed to get Wi-Fi interface on host") - print("Interface name : " + iface) - print("SoftAP SSID : " + ssid) - print("SoftAP Password : " + password) + raise RuntimeError('Failed to get Wi-Fi interface on host') + print('Interface name : ' + iface) + print('SoftAP SSID : ' + ssid) + print('SoftAP Password : ' + password) try: ctrl = wifi_tools.wpa_cli(iface, reset_on_exit=True) - print("Connecting to DUT SoftAP...") - ip = ctrl.connect(ssid, password) - got_ip = dut1.expect(re.compile(r"DHCP server assigned IP to a station, IP is: (\d+.\d+.\d+.\d+)"), timeout=60)[0] - if ip != got_ip: - raise RuntimeError("SoftAP connected to another host! " + ip + "!=" + got_ip) - print("Connected to DUT SoftAP") + print('Connecting to DUT SoftAP...') + try: + ip = ctrl.connect(ssid, password) + except RuntimeError as err: + Utility.console_log('error: {}'.format(err)) + try: + got_ip = dut1.expect(re.compile(r'DHCP server assigned IP to a station, IP is: (\d+.\d+.\d+.\d+)'), timeout=60) + Utility.console_log('got_ip: {}'.format(got_ip)) + got_ip = got_ip[0] + if ip != got_ip: + raise RuntimeError('SoftAP connected to another host! {} != {}'.format(ip, got_ip)) + except tiny_test_fw.DUT.ExpectTimeout: + # print what is happening on dut side + Utility.console_log('in exception tiny_test_fw.DUT.ExpectTimeout') + Utility.console_log(dut1.read()) + raise + print('Connected to DUT SoftAP') - print("Starting Provisioning") + print('Starting Provisioning') verbose = False - protover = "V0.1" + protover = 'V0.1' secver = 1 - pop = "abcd1234" - provmode = "softap" - ap_ssid = "myssid" - ap_password = "mypassword" - softap_endpoint = ip.split('.')[0] + "." + ip.split('.')[1] + "." + ip.split('.')[2] + ".1:80" + pop = 'abcd1234' + provmode = 'softap' + ap_ssid = 'myssid' + ap_password = 'mypassword' + softap_endpoint = '{}.{}.{}.1:80'.format(ip.split('.')[0], ip.split('.')[1], ip.split('.')[2]) - print("Getting security") + print('Getting security') security = esp_prov.get_security(secver, pop, verbose) if security is None: - raise RuntimeError("Failed to get security") + raise RuntimeError('Failed to get security') - print("Getting transport") + print('Getting transport') transport = esp_prov.get_transport(provmode, softap_endpoint) if transport is None: - raise RuntimeError("Failed to get transport") + raise RuntimeError('Failed to get transport') - print("Verifying protocol version") + print('Verifying protocol version') if not esp_prov.version_match(transport, protover): - raise RuntimeError("Mismatch in protocol version") + raise RuntimeError('Mismatch in protocol version') - print("Starting Session") + print('Starting Session') if not esp_prov.establish_session(transport, security): - raise RuntimeError("Failed to start session") + raise RuntimeError('Failed to start session') - print("Sending Wifi credential to DUT") + print('Sending Wifi credential to DUT') if not esp_prov.send_wifi_config(transport, security, ap_ssid, ap_password): - raise RuntimeError("Failed to send Wi-Fi config") + raise RuntimeError('Failed to send Wi-Fi config') - print("Applying config") + print('Applying config') if not esp_prov.apply_wifi_config(transport, security): - raise RuntimeError("Failed to send apply config") + raise RuntimeError('Failed to send apply config') if not esp_prov.wait_wifi_connected(transport, security): - raise RuntimeError("Provisioning failed") + raise RuntimeError('Provisioning failed') finally: ctrl.reset() diff --git a/tools/ble/lib_ble_client.py b/tools/ble/lib_ble_client.py index 73be5187fb..392df38222 100644 --- a/tools/ble/lib_ble_client.py +++ b/tools/ble/lib_ble_client.py @@ -18,12 +18,11 @@ # DBus-Bluez BLE library from __future__ import print_function + import sys import time -import traceback try: - from future.moves.itertools import zip_longest import dbus import dbus.mainloop.glib from gi.repository import GLib @@ -31,44 +30,11 @@ except ImportError as e: if 'linux' not in sys.platform: raise e print(e) - print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue") - print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue") + print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue') + print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue') raise -from . import lib_gatt -from . import lib_gap - -srv_added_old_cnt = 0 -srv_added_new_cnt = 0 -verify_signal_check = 0 -blecent_retry_check_cnt = 0 -gatt_app_retry_check_cnt = 0 -verify_service_cnt = 0 -verify_readchars_cnt = 0 -adv_retry_check_cnt = 0 -blecent_adv_uuid = '1811' -gatt_app_obj_check = False -gatt_app_reg_check = False -adv_checks_done = False -gatt_checks_done = False -adv_data_check = False -adv_reg_check = False -read_req_check = False -write_req_check = False -subscribe_req_check = False -ble_hr_chrc = False -discovery_start = False -signal_caught = False -test_checks_pass = False -adv_stop = False -services_resolved = False -service_uuid_found = False -adapter_on = False -device_connected = False -gatt_app_registered = False -adv_registered = False -adv_active_instance = False -chrc_value_cnt = False +from . import lib_gap, lib_gatt BLUEZ_SERVICE_NAME = 'org.bluez' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' @@ -84,811 +50,663 @@ GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' -# Set up the main loop. -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) -dbus.mainloop.glib.threads_init() -# Set up the event main loop. -event_loop = GLib.MainLoop() +class DBusException(dbus.exceptions.DBusException): + pass -def verify_signal_is_caught(): - global verify_signal_check - verify_signal_check += 1 - - if (not signal_caught and verify_signal_check == 15) or (signal_caught): - if event_loop.is_running(): - event_loop.quit() - return False # polling for checks will stop - - return True # polling will continue +class Characteristic: + def __init__(self): + self.iface = None + self.path = None + self.props = None -def set_props_status(props): - """ - Set Adapter status if it is powered on or off - """ - global adapter_on, services_resolved, GATT_OBJ_REMOVED, gatt_app_registered, \ - adv_registered, adv_active_instance, device_connected, CHRC_VALUE, chrc_value_cnt, \ - signal_caught - is_service_uuid = False - # Signal caught for change in Adapter Powered property - if 'Powered' in props: - if props['Powered'] == 1: - signal_caught = True - adapter_on = True - else: - signal_caught = True - adapter_on = False - if 'ServicesResolved' in props: - if props['ServicesResolved'] == 1: - signal_caught = True - services_resolved = True - else: - signal_caught = True - services_resolved = False - if 'UUIDs' in props: - # Signal caught for add/remove GATT data having service uuid - for uuid in props['UUIDs']: - if blecent_adv_uuid in uuid: - is_service_uuid = True - if not is_service_uuid: - # Signal caught for removing GATT data having service uuid - # and for unregistering GATT application - gatt_app_registered = False - lib_gatt.GATT_APP_OBJ = False - if 'ActiveInstances' in props: - # Signal caught for Advertising - add/remove Instances property - if props['ActiveInstances'] == 1: - adv_active_instance = True - elif props['ActiveInstances'] == 0: - adv_active_instance = False - adv_registered = False - lib_gap.ADV_OBJ = False - if 'Connected' in props: - # Signal caught for device connect/disconnect - if props['Connected'] == 1: - signal_caught = True - device_connected = True - else: - signal_caught = True - device_connected = False - if 'Value' in props: - # Signal caught for change in chars value - if ble_hr_chrc: - chrc_value_cnt += 1 - print(props['Value']) - if chrc_value_cnt == 10: - signal_caught = True - return False - - return False +class Service: + def __init__(self): + self.iface = None + self.path = None + self.props = None + self.chars = [] -def props_change_handler(iface, changed_props, invalidated): - """ - PropertiesChanged Signal handler. - Catch and print information about PropertiesChanged signal. - """ +class Device: + def __init__(self): + self.iface = None + self.path = None + self.props = None + self.name = None + self.addr = None + self.services = [] - if iface == ADAPTER_IFACE: - set_props_status(changed_props) - if iface == LE_ADVERTISING_MANAGER_IFACE: - set_props_status(changed_props) - if iface == DEVICE_IFACE: - set_props_status(changed_props) - if iface == GATT_CHRC_IFACE: - set_props_status(changed_props) + +class Adapter: + def __init__(self): + self.iface = None + self.path = None + self.props = None class BLE_Bluez_Client: - def __init__(self, iface, devname=None, devaddr=None): + def __init__(self, iface=None): self.bus = None + self.hci_iface = iface + self.adapter = Adapter() self.device = None - self.devname = devname - self.devaddr = devaddr - self.iface = iface - self.ble_objs = None - self.props_iface_obj = None - self.adapter_path = [] - self.adapter = None - self.services = [] - self.srv_uuid = [] - self.chars = {} - self.char_uuid = [] + self.gatt_app = None + self.gatt_mgr = None + self.mainloop = None + self.loop_cnt = 0 try: + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.bus = dbus.SystemBus() - om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) - self.ble_objs = om_iface_obj.GetManagedObjects() - - except Exception as e: - print(e) + except dbus.exceptions.DBusException as dbus_err: + raise DBusException('Failed to initialise client: {}'.format(dbus_err)) + except Exception as err: + raise Exception('Failed to initialise client: {}'.format(err)) def __del__(self): try: - print("Test Exit") + # Cleanup + self.disconnect() + print('Test Exit') except Exception as e: print(e) - sys.exit(1) def set_adapter(self): ''' Discover Bluetooth Adapter Power On Bluetooth Adapter ''' - global verify_signal_check, signal_caught, adapter_on - verify_signal_check = 0 - adapter_on = False try: - print("discovering adapter...") - for path, interfaces in self.ble_objs.items(): + print('discovering adapter') + dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) + dbus_objs = dbus_obj_mgr.GetManagedObjects() + for path, interfaces in dbus_objs.items(): adapter = interfaces.get(ADAPTER_IFACE) - if adapter is not None: - if path.endswith(self.iface): - self.adapter = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE) - # Create Properties Interface object only after adapter is found - self.props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE) - self.adapter_path = [path, interfaces] - # Check adapter status - power on/off - set_props_status(interfaces[ADAPTER_IFACE]) - break + if adapter is not None and path.endswith(self.hci_iface): + self.adapter.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE) + self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE) + self.adapter.path = path + break - if self.adapter is None: - raise Exception("Bluetooth adapter not found") - - if self.props_iface_obj is None: - raise Exception("Properties interface not found") - - print("bluetooth adapter discovered") + if self.adapter.iface is None: + print('bluetooth adapter not found') + return False + print('bluetooth adapter discovered') + print('checking if bluetooth adapter is already powered on') # Check if adapter is already powered on - if adapter_on: - print("Adapter already powered on") + powered = self.adapter.props.Get(ADAPTER_IFACE, 'Powered') + if powered == 1: + print('adapter already powered on') return True - # Power On Adapter - print("powering on adapter...") - self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) - self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(1)) + print('powering on adapter') + self.adapter.props.Set(ADAPTER_IFACE, 'Powered', dbus.Boolean(1)) + # Check if adapter is powered on + print('checking if adapter is powered on') + for cnt in range(10, 0, -1): + time.sleep(5) + powered_on = self.adapter.props.Get(ADAPTER_IFACE, 'Powered') + if powered_on == 1: + # Set adapter props again with powered on value + self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), DBUS_PROP_IFACE) + print('bluetooth adapter powered on') + return True + print('number of retries left({})'.format(cnt - 1)) - signal_caught = False - GLib.timeout_add_seconds(5, verify_signal_is_caught) - event_loop.run() - - if adapter_on: - print("bluetooth adapter powered on") - return True - else: - raise Exception("Failure: bluetooth adapter not powered on") - - except Exception: - print(traceback.format_exc()) + # Adapter not powered on + print('bluetooth adapter not powered on') return False - def connect(self): + except Exception as err: + raise Exception('Failed to set adapter: {}'.format(err)) + + def connect(self, devname=None, devaddr=None): ''' - Connect to the device discovered - Retry 10 times to discover and connect to device + Start Discovery and Connect to the device ''' - global discovery_start, signal_caught, device_connected, verify_signal_check - device_found = False - device_connected = False try: - self.adapter.StartDiscovery() - print("\nStarted Discovery") + device_found = None + start_discovery = False + self.device = Device() - discovery_start = True + discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering') + # Start Discovery + if discovery_val == 0: + print('starting discovery') + self.adapter.iface.StartDiscovery() + start_discovery = True - for retry_cnt in range(10,0,-1): - verify_signal_check = 0 - try: - if self.device is None: - print("\nConnecting to device...") - # Wait for device to be discovered + for cnt in range(10, 0, -1): + time.sleep(5) + discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering') + if discovery_val == 1: + print('start discovery successful') + break + print('number of retries left ({})'.format(cnt - 1)) + + if discovery_val == 0: + print('start discovery failed') + return False + + # Get device + for cnt in range(10, 0, -1): + # Wait for device to be discovered + time.sleep(5) + device_found = self.get_device( + devname=devname, + devaddr=devaddr) + if device_found: + break + # Retry + print('number of retries left ({})'.format(cnt - 1)) + + if not device_found: + print('expected device {} [ {} ] not found'.format(devname, devaddr)) + return False + + # Connect to expected device found + print('connecting to device {} [ {} ] '.format(self.device.name, self.device.addr)) + self.device.iface.Connect(dbus_interface=DEVICE_IFACE) + for cnt in range(10, 0, -1): + time.sleep(5) + connected = self.device.props.Get(DEVICE_IFACE, 'Connected') + if connected == 1: + # Set device props again with connected on value + self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.device.path), DBUS_PROP_IFACE) + print('connected to device with iface {}'.format(self.device.path)) + return True + print('number of retries left({})'.format(cnt - 1)) + + # Device not connected + print('connection to device failed') + return False + + except Exception as err: + raise Exception('Connect to device failed : {}'.format(err)) + finally: + try: + if start_discovery: + print('stopping discovery') + self.adapter.iface.StopDiscovery() + for cnt in range(10, 0, -1): time.sleep(5) - device_found = self.get_device() - if device_found: - self.device.Connect(dbus_interface=DEVICE_IFACE) - time.sleep(15) - signal_caught = False - GLib.timeout_add_seconds(5, verify_signal_is_caught) - event_loop.run() - if device_connected: - print("\nConnected to device") - return True - else: - raise Exception - except Exception as e: - print(e) - print("\nRetries left", retry_cnt - 1) - continue + discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering') + if discovery_val == 0: + print('stop discovery successful') + break + print('number of retries left ({})'.format(cnt - 1)) + if discovery_val == 1: + print('stop discovery failed') + except dbus.exceptions.DBusException as dbus_err: + print('Warning: Failure during cleanup for device connection : {}'.format(dbus_err)) - # Device not found - return False - - except Exception: - print(traceback.format_exc()) - self.device = None - return False - - def get_device(self): + def get_device(self, devname=None, devaddr=None): ''' - Discover device based on device name - and device address and connect + Get device based on device name + and device address and connect to device ''' dev_path = None + expected_device_addr = devaddr.lower() + expected_device_name = devname.lower() - om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) - self.ble_objs = om_iface_obj.GetManagedObjects() - for path, interfaces in self.ble_objs.items(): + print('checking if expected device {} [ {} ] is present'.format(devname, devaddr)) + + dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) + dbus_objs = dbus_obj_mgr.GetManagedObjects() + + # Check if expected device is present + for path, interfaces in dbus_objs.items(): if DEVICE_IFACE not in interfaces.keys(): continue - device_addr_iface = (path.replace('_', ':')).lower() - dev_addr = self.devaddr.lower() - if dev_addr in device_addr_iface and \ - interfaces[DEVICE_IFACE].get("Name") == self.devname: + + # Check expected device address is received device address + received_device_addr_path = (path.replace('_', ':')).lower() + if expected_device_addr not in received_device_addr_path: + continue + + device_props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE) + received_device_name = device_props.Get(DEVICE_IFACE, 'Name').lower() + + # Check expected device name is received device name + if expected_device_name == received_device_name: + # Set device iface path dev_path = path break - if dev_path is None: - raise Exception("\nBLE device not found") + if not dev_path: + print('\nBLE device not found') + return False - device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE) - device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) + print('device {} [ {} ] found'.format(devname, devaddr)) - self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path) + # Set device details + self.device.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DEVICE_IFACE) + self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE) + self.device.path = dev_path + self.device.name = devname + self.device.addr = devaddr return True - def srvc_iface_added_handler(self, path, interfaces): - ''' - Add services found as lib_ble_client obj - ''' - if self.device and path.startswith(self.device.object_path): - if GATT_SERVICE_IFACE in interfaces.keys(): - service = self.bus.get_object(BLUEZ_SERVICE_NAME, path) - uuid = service.Get(GATT_SERVICE_IFACE, 'UUID', dbus_interface=DBUS_PROP_IFACE) - if uuid not in self.srv_uuid: - self.srv_uuid.append(uuid) - if path not in self.services: - self.services.append(path) - - def verify_service_uuid_found(self, service_uuid): - ''' - Verify service uuid found - ''' - global service_uuid_found - - srv_uuid_found = [uuid for uuid in self.srv_uuid if service_uuid in uuid] - if srv_uuid_found: - service_uuid_found = True - - def get_services(self, service_uuid=None): + def get_services(self): ''' Retrieve Services found in the device connected ''' - global signal_caught, service_uuid_found, services_resolved, verify_signal_check - verify_signal_check = 0 - service_uuid_found = False - services_resolved = False - signal_caught = False - try: - om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) - self.ble_objs = om_iface_obj.GetManagedObjects() - for path, interfaces in self.ble_objs.items(): - self.srvc_iface_added_handler(path, interfaces) - # If services not found, then they may not have been added yet on dbus - if not self.services: - GLib.timeout_add_seconds(5, verify_signal_is_caught) - om_iface_obj.connect_to_signal('InterfacesAdded', self.srvc_iface_added_handler) - event_loop.run() - if not services_resolved: - raise Exception("Services not found...") + # Get current dbus objects + dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) + dbus_objs = dbus_obj_mgr.GetManagedObjects() - if service_uuid: - self.verify_service_uuid_found(service_uuid) - if not service_uuid_found: - raise Exception("Service with uuid: %s not found..." % service_uuid) + # Get services + for path, interfaces in dbus_objs.items(): + if GATT_SERVICE_IFACE in interfaces.keys(): + if not path.startswith(self.device.path): + continue + received_service = self.bus.get_object(BLUEZ_SERVICE_NAME, path) + # Retrieve all services on device iface path + # and set each service received + service = Service() + service.path = path + service.iface = dbus.Interface(received_service, GATT_SERVICE_IFACE) + service.props = dbus.Interface(received_service, DBUS_PROP_IFACE) + self.device.services.append(service) - # Services found - return self.srv_uuid - except Exception: - print(traceback.format_exc()) - return False + if not self.device.services: + print('no services found for device: {}'.format(self.device.path)) + return False - def chrc_iface_added_handler(self, path, interfaces): + return True + + except Exception as err: + raise Exception('Failed to get services: {}'.format(err)) + + def get_chars(self): ''' - Add services found as lib_ble_client obj + Get characteristics of the services set for the device connected ''' - global chrc, chrc_discovered, signal_caught - chrc_val = None + try: + if not self.device.services: + print('No services set for device: {}'.format(self.device.path)) + return - if self.device and path.startswith(self.device.object_path): - if GATT_CHRC_IFACE in interfaces.keys(): - chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path) - chrc_props = chrc.GetAll(GATT_CHRC_IFACE, - dbus_interface=DBUS_PROP_IFACE) - chrc_flags = chrc_props['Flags'] - if 'read' in chrc_flags: - chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) - uuid = chrc_props['UUID'] - self.chars[path] = chrc_val, chrc_flags, uuid - signal_caught = True + # Read chars for all the services received for device + for service in self.device.services: + char_found = False + dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) + dbus_objs = dbus_obj_mgr.GetManagedObjects() + for path, interfaces in dbus_objs.items(): + if GATT_CHRC_IFACE in interfaces.keys(): + if not path.startswith(self.device.path): + continue + if not path.startswith(service.path): + continue + # Set characteristics + received_char = self.bus.get_object(BLUEZ_SERVICE_NAME, path) + char = Characteristic() + char.path = path + char.iface = dbus.Interface(received_char, GATT_CHRC_IFACE) + char.props = dbus.Interface(received_char, DBUS_PROP_IFACE) + service.chars.append(char) + char_found = True + + if not char_found: + print('Characteristic not found for service: {}'.format(service.iface)) + + except Exception as err: + raise Exception('Failed to get characteristics : {}'.format(err)) def read_chars(self): ''' - Read characteristics found in the device connected + Read value of characteristics ''' - global iface_added, chrc_discovered, signal_caught, verify_signal_check - verify_signal_check = 0 - chrc_discovered = False - iface_added = False - signal_caught = False - try: - om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) - self.ble_objs = om_iface_obj.GetManagedObjects() - for path, interfaces in self.ble_objs.items(): - self.chrc_iface_added_handler(path, interfaces) + if not self.device.services: + print('No services set for device: {}'.format(self.device.path)) + return - # If chars not found, then they have not been added yet to interface - time.sleep(15) - if not self.chars: - iface_added = True - GLib.timeout_add_seconds(5, verify_signal_is_caught) - om_iface_obj.connect_to_signal('InterfacesAdded', self.chars_iface_added_handler) - event_loop.run() - return self.chars - except Exception: - print(traceback.format_exc()) - return False + # Read chars for all services of device + for service in self.device.services: + # Read properties of characteristic + for char in service.chars: + # Print path + print('Characteristic: {}'.format(char.path)) + # Print uuid + uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID') + print('UUID: {}'.format(uuid)) + # Print flags + flags = [flag for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')] + print('Flags: {}'.format(flags)) + # Read value if `read` flag is present + if 'read' in flags: + value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) + print('Value: {}'.format(value)) - def write_chars(self, write_val): + except Exception as err: + raise Exception('Failed to read characteristics : {}'.format(err)) + + def write_chars(self, new_value): ''' - Write characteristics to the device connected + Write to characteristics ''' - chrc = None - chrc_val = None - char_write_props = False - try: - for path, props in self.chars.items(): - if 'write' in props[1]: # check permission - char_write_props = True - chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path) - chrc.WriteValue(write_val,{},dbus_interface=GATT_CHRC_IFACE) - if 'read' in props[1]: - chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) - else: - print("Warning: Cannot read value. Characteristic does not have read permission.") - if not (ord(write_val) == int(chrc_val[0])): - print("\nWrite Failed") + if not self.device.services: + print('No services set for device: {}'.format(self.device.path)) + return False + + print('writing data to characteristics with read and write permission') + # Read chars of all services of device + for service in self.device.services: + if not service.chars: + print('No chars found for service: {}'.format(service.path)) + continue + for char in service.chars: + flags = [flag.lower() for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')] + if not ('read' in flags and 'write' in flags): + continue + + # Write new value to characteristic + curr_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) + print('current value: {}'.format(curr_value)) + + print('writing {} to characteristic {}'.format(new_value, char.path)) + char.iface.WriteValue(new_value, {}, dbus_interface=GATT_CHRC_IFACE) + + time.sleep(5) + updated_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) + print('updated value: {}'.format(updated_value)) + + if not (ord(new_value) == int(updated_value[0])): + print('write operation to {} failed'.format(char.path)) return False - self.chars[path] = chrc_val, props[1], props[2] # update value - if not char_write_props: - raise Exception("Failure: Cannot perform write operation. Characteristic does not have write permission.") + print('write operation to {} successful'.format(char.path)) + return True - return self.chars - except Exception: - print(traceback.format_exc()) - return False + except Exception as err: + raise Exception('Failed to write to characteristics: {}'.format(err)) - def hr_update_simulation(self, hr_srv_uuid, hr_char_uuid): + def get_char_if_exists(self, char_uuid): ''' - Start Notifications - Retrieve updated value - Stop Notifications + Get char if exists for given uuid ''' - global ble_hr_chrc, verify_signal_check, signal_caught, chrc_value_cnt - - srv_path = None - chrc = None - uuid = None - chrc_path = None - chars_ret = None - ble_hr_chrc = True - chrc_value_cnt = 0 - try: - # Get HR Measurement characteristic - services = list(zip_longest(self.srv_uuid, self.services)) - for uuid, path in services: - if hr_srv_uuid in uuid: - srv_path = path - break + for service in self.device.services: + for char in service.chars: + curr_uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID') + if char_uuid.lower() in curr_uuid.lower(): + return char + print('char {} not found'.format(char_uuid)) + return False + except Exception as err: + raise Exception('Failed to get char based on uuid {} - {}'.format(char_uuid, err)) - if srv_path is None: - raise Exception("Failure: HR UUID:", hr_srv_uuid, "not found") + def get_service_if_exists(self, service_uuid): + try: + for service in self.device.services: + uuid = service.props.Get(GATT_SERVICE_IFACE, 'UUID') + if service_uuid.lower() in uuid.lower(): + return service + print('service {} not found'.format(service_uuid)) + return False + except Exception as err: + raise Exception('Failed to get service based on uuid {} - {}'.format(service_uuid, err)) - chars_ret = self.read_chars() - - for path, props in chars_ret.items(): - if path.startswith(srv_path): - chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path) - chrc_path = path - if hr_char_uuid in props[2]: # uuid + def start_notify(self, char): + try: + notify_started = 0 + notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying') + if notifying == 0: + # Start Notify + char.iface.StartNotify() + notify_started = 1 + # Check notify started + for _ in range(10, 0, -1): + notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying') + if notifying == 1: + print('subscribe to notifications: on') break - if chrc is None: - raise Exception("Failure: Characteristics for service: ", srv_path, "not found") + if notifying == 0: + print('Failed to start notifications') + return False - # Subscribe to notifications - print("\nSubscribe to notifications: On") - chrc.StartNotify(dbus_interface=GATT_CHRC_IFACE) + # Get updated value + for _ in range(10, 0, -1): + time.sleep(1) + char_value = char.props.Get(GATT_CHRC_IFACE, 'Value') + print(char_value) - chrc_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, chrc_path), DBUS_PROP_IFACE) - chrc_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) + return None - signal_caught = False - verify_signal_check = 0 - GLib.timeout_add_seconds(5, verify_signal_is_caught) - event_loop.run() - chrc.StopNotify(dbus_interface=GATT_CHRC_IFACE) - time.sleep(2) - print("\nSubscribe to notifications: Off") + except Exception as err: + raise Exception('Failed to perform notification operation: {}'.format(err)) + finally: + try: + if notify_started == 1: + # Stop notify + char.iface.StopNotify() + for _ in range(10, 0, -1): + notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying') + if notifying == 0: + print('subscribe to notifications: off') + break + if notifying == 1: + print('Failed to stop notifications') + except dbus.exceptions.DBusException as dbus_err: + print('Warning: Failure during cleanup for start notify : {}'.format(dbus_err)) - ble_hr_chrc = False - return True - - except Exception: - print(traceback.format_exc()) - return False - - def create_and_reg_gatt_app(self): + def _create_mainloop(self): ''' - Create GATT data - Register GATT Application + Create GLibMainLoop ''' - global gatt_app_obj, gatt_manager_iface_obj, gatt_app_registered - - gatt_app_obj = None - gatt_manager_iface_obj = None - gatt_app_registered = False - lib_gatt.GATT_APP_OBJ = False + if not self.mainloop: + self.mainloop = GLib.MainLoop() + def register_gatt_app(self): + ''' + Create Gatt Application + Register Gatt Application + ''' try: - gatt_app_obj = lib_gatt.Application(self.bus, self.adapter_path[0]) - gatt_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME,self.adapter_path[0]), GATT_MANAGER_IFACE) + # Create mainloop, if does not exist + self._create_mainloop() - gatt_manager_iface_obj.RegisterApplication(gatt_app_obj, {}, - reply_handler=self.gatt_app_handler, - error_handler=self.gatt_app_error_handler) - return True - except Exception: - print(traceback.format_exc()) - return False + # Create Gatt Application + self.gatt_app = lib_gatt.AlertNotificationApp(self.bus, self.adapter.path) + print('GATT Application created') + self.gatt_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), GATT_MANAGER_IFACE) - def gatt_app_handler(self): - ''' - GATT Application Register success handler - ''' - global gatt_app_registered - gatt_app_registered = True + # Register Gatt Application + self.gatt_mgr.RegisterApplication( + self.gatt_app, {}, + reply_handler=self.gatt_app_success_handler, + error_handler=self.gatt_app_error_handler) + self.mainloop.run() - def gatt_app_error_handler(self, error): - ''' - GATT Application Register error handler - ''' - global gatt_app_registered - gatt_app_registered = False + except dbus.exceptions.DBusException as dbus_err: + raise DBusException('Failed to create GATT Application : {}'.format(dbus_err)) + except Exception as err: + raise Exception('Failed to register Gatt Application: {}'.format(err)) - def start_advertising(self, adv_host_name, adv_iface_index, adv_type, adv_uuid): - ''' - Create Advertising data - Register Advertisement - Start Advertising - ''' - global le_adv_obj, le_adv_manager_iface_obj, adv_active_instance, adv_registered + def gatt_app_success_handler(self): + print('GATT Application successfully registered') + self.mainloop.quit() - le_adv_obj = None - le_adv_manager_iface_obj = None - le_adv_iface_path = None - adv_active_instance = False - adv_registered = False - lib_gap.ADV_OBJ = False + def gatt_app_error_handler(self): + raise DBusException('Failed to register GATT Application') + def check_le_iface(self): + ''' + Check if LEAdvertisingManager1 interface exists + ''' try: - print("Advertising started") - gatt_app_ret = self.create_and_reg_gatt_app() - - # Check if gatt app create and register command - # is sent successfully - assert gatt_app_ret - - GLib.timeout_add_seconds(2, self.verify_gatt_app_reg) - event_loop.run() - - # Check if Gatt Application is registered - assert gatt_app_registered - - for path,interface in self.ble_objs.items(): - if LE_ADVERTISING_MANAGER_IFACE in interface: + dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) + dbus_objs = dbus_obj_mgr.GetManagedObjects() + for path, iface in dbus_objs.items(): + if LE_ADVERTISING_MANAGER_IFACE in iface: le_adv_iface_path = path - + break # Check LEAdvertisingManager1 interface is found assert le_adv_iface_path, '\n Cannot start advertising. LEAdvertisingManager1 Interface not found' - # Get device when connected - if not self.device: - om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) - self.ble_objs = om_iface_obj.GetManagedObjects() + return le_adv_iface_path - for path, interfaces in self.ble_objs.items(): - if DEVICE_IFACE not in interfaces.keys(): - continue - device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE) - device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) - self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, path) + except AssertionError: + raise + except Exception as err: + raise Exception('Failed to find LEAdvertisingManager1 interface: {}'.format(err)) - le_adv_obj = lib_gap.Advertisement(self.bus, adv_iface_index, adv_type, adv_uuid, adv_host_name) - le_adv_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_adv_iface_path), LE_ADVERTISING_MANAGER_IFACE) + def register_adv(self, adv_host_name, adv_type, adv_uuid): + try: + # Gatt Application is expected to be registered + if not self.gatt_app: + print('No Gatt Application is registered') + return - le_adv_manager_iface_obj.RegisterAdvertisement(le_adv_obj.get_path(), {}, - reply_handler=self.adv_handler, - error_handler=self.adv_error_handler) + adv_iface_index = 0 - GLib.timeout_add_seconds(2, self.verify_adv_reg) - event_loop.run() + # Create mainloop, if does not exist + self._create_mainloop() - # Check if advertising is registered - assert adv_registered + # Check LEAdvertisingManager1 interface exists + le_iface_path = self.check_le_iface() - ret_val = self.verify_blecent_app() + # Create Advertisement data + leadv_obj = lib_gap.Advertisement( + self.bus, + adv_iface_index, + adv_type, + adv_uuid, + adv_host_name) + print('Advertisement registered') - # Check if blecent has passed - assert ret_val + # Register Advertisement + leadv_mgr_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_iface_path), LE_ADVERTISING_MANAGER_IFACE) + leadv_mgr_iface_obj.RegisterAdvertisement( + leadv_obj.get_path(), {}, + reply_handler=self.adv_success_handler, + error_handler=self.adv_error_handler) - except Exception: - print(traceback.format_exc()) - return False + # Handler to read events received and exit from mainloop + GLib.timeout_add_seconds(3, self.check_adv) - def verify_blecent_app(self): + self.mainloop.run() + + except AssertionError: + raise + except dbus.exceptions.DBusException as dbus_err: + raise DBusException('Failure during registering advertisement : {}'.format(dbus_err)) + except Exception as err: + raise Exception('Failure during registering advertisement : {}'.format(err)) + else: + try: + try: + # Stop Notify if not already stopped + chars = self.gatt_app.service.get_characteristics() + for char in chars: + if char.uuid == lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID']: + if char.notifying: + char.StopNotify() + except dbus.exceptions.DBusException as dbus_err: + print('Warning: {}'.format(dbus_err)) + + try: + # Unregister Advertisement + leadv_mgr_iface_obj.UnregisterAdvertisement(leadv_obj.get_path()) + except dbus.exceptions.DBusException as dbus_err: + print('Warning: {}'.format(dbus_err)) + + try: + # Remove advertising data + dbus.service.Object.remove_from_connection(leadv_obj) + except LookupError as err: + print('Warning: Failed to remove connection from dbus for advertisement object: {} - {}'.format(leadv_obj, err)) + + try: + # Unregister Gatt Application + self.gatt_mgr.UnregisterApplication(self.gatt_app.get_path()) + except dbus.exceptions.DBusException as dbus_err: + print('Warning: {}'.format(dbus_err)) + + try: + # Remove Gatt Application + dbus.service.Object.remove_from_connection(self.gatt_app) + except LookupError as err: + print('Warning: Failed to remove connection from dbus for Gatt application object: {} - {}'.format(self.gatt_app, err)) + + except RuntimeError as err: + print('Warning: Failure during cleanup of Advertisement: {}'.format(err)) + + def adv_success_handler(self): + print('Registered Advertisement successfully') + + def adv_error_handler(self, err): + raise DBusException('{}'.format(err)) + + def check_adv(self): ''' - Verify read/write/subscribe operations + Handler to check for events triggered (read/write/subscribe) + for advertisement registered for AlertNotificationApp ''' try: - GLib.timeout_add_seconds(2, self.verify_blecent) - event_loop.run() + retry = 10 + # Exit loop if read and write and subscribe is successful + if self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'], 'read') and \ + self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['ALERT_NOTIF_UUID'], 'write') and \ + self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'], 'notify'): + if self.mainloop.is_running(): + self.mainloop.quit() + # return False to stop polling + return False - return test_checks_pass + self.loop_cnt += 1 + print('Check read/write/subscribe events are received...Retry {}'.format(self.loop_cnt)) - except Exception: - print(traceback.format_exc()) - return False + # Exit loop if max retry value is reached and + # all three events (read and write and subscribe) have not yet passed + # Retry total 10 times + if self.loop_cnt == (retry - 1): + if self.mainloop.is_running(): + self.mainloop.quit() + # return False to stop polling + return False - def adv_handler(self): - ''' - Advertisement Register success handler - ''' - global adv_registered - adv_registered = True + # return True to continue polling + return True - def adv_error_handler(self, error): - ''' - Advertisement Register error handler - ''' - global adv_registered - adv_registered = False - - def verify_gatt_app_reg(self): - """ - Verify GATT Application is registered - """ - global gatt_app_retry_check_cnt, gatt_checks_done - gatt_app_retry_check_cnt = 0 - gatt_checks_done = False - - # Check for success - if lib_gatt.GATT_APP_OBJ: - print("GATT Data created") - if gatt_app_registered: - print("GATT Application registered") - gatt_checks_done = True - if gatt_app_retry_check_cnt == 20: - if not gatt_app_registered: - print("Failure: GATT Application could not be registered") - gatt_checks_done = True - - # End polling if app is registered or cnt has reached 10 - if gatt_checks_done: - if event_loop.is_running(): - event_loop.quit() - return False # polling for checks will stop - - gatt_app_retry_check_cnt += 1 - # Default return True - polling for checks will continue - return True - - def verify_adv_reg(self): - """ - Verify Advertisement is registered - """ - global adv_retry_check_cnt, adv_checks_done - adv_retry_check_cnt = 0 - adv_checks_done = False - - if lib_gap.ADV_OBJ: - print("Advertising data created") - if adv_registered or adv_active_instance: - print("Advertisement registered") - adv_checks_done = True - if adv_retry_check_cnt == 10: - if not adv_registered and not adv_active_instance: - print("Failure: Advertisement could not be registered") - adv_checks_done = True - - # End polling if success or cnt has reached 10 - if adv_checks_done: - if event_loop.is_running(): - event_loop.quit() - return False # polling for checks will stop - - adv_retry_check_cnt += 1 - # Default return True - polling for checks will continue - return True - - def verify_blecent(self): - """ - Verify blecent test commands are successful - """ - global blecent_retry_check_cnt, read_req_check, write_req_check,\ - subscribe_req_check, test_checks_pass - - # Check for failures after 10 retries - if blecent_retry_check_cnt == 10: - # check for failures - if not read_req_check: - print("Failure: Read Request not received") - if not write_req_check: - print("Failure: Write Request not received") - if not subscribe_req_check: - print("Failure: Subscribe Request not received") - - # Blecent Test failed - test_checks_pass = False - if subscribe_req_check: - lib_gatt.alert_status_char_obj.StopNotify() - else: - # Check for success - if not read_req_check and lib_gatt.CHAR_READ: - read_req_check = True - if not write_req_check and lib_gatt.CHAR_WRITE: - write_req_check = True - if not subscribe_req_check and lib_gatt.CHAR_SUBSCRIBE: - subscribe_req_check = True - - if read_req_check and write_req_check and subscribe_req_check: - # all checks passed - # Blecent Test passed - test_checks_pass = True - lib_gatt.alert_status_char_obj.StopNotify() - - if (blecent_retry_check_cnt == 10 or test_checks_pass): - if event_loop.is_running(): - event_loop.quit() - return False # polling for checks will stop - - # Increment retry count - blecent_retry_check_cnt += 1 - - # Default return True - polling for checks will continue - return True - - def verify_blecent_disconnect(self): - """ - Verify cleanup is successful - """ - global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\ - adv_data_check, adv_reg_check, adv_stop - - if blecent_retry_check_cnt == 0: - gatt_app_obj_check = False - gatt_app_reg_check = False - adv_data_check = False - adv_reg_check = False - - # Check for failures after 10 retries - if blecent_retry_check_cnt == 10: - # check for failures - if not gatt_app_obj_check: - print("Warning: GATT Data could not be removed") - if not gatt_app_reg_check: - print("Warning: GATT Application could not be unregistered") - if not adv_data_check: - print("Warning: Advertising data could not be removed") - if not adv_reg_check: - print("Warning: Advertisement could not be unregistered") - - # Blecent Test failed - adv_stop = False - else: - # Check for success - if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ: - print("GATT Data removed") - gatt_app_obj_check = True - if not gatt_app_reg_check and not gatt_app_registered: - print("GATT Application unregistered") - gatt_app_reg_check = True - if not adv_data_check and not adv_reg_check and not (adv_registered or adv_active_instance or lib_gap.ADV_OBJ): - print("Advertising data removed") - print("Advertisement unregistered") - adv_data_check = True - adv_reg_check = True - # all checks passed - adv_stop = True - - if (blecent_retry_check_cnt == 10 or adv_stop): - if event_loop.is_running(): - event_loop.quit() - return False # polling for checks will stop - - # Increment retry count - blecent_retry_check_cnt += 1 - - # Default return True - polling for checks will continue - return True + except RuntimeError as err: + print('Failure in advertisment handler: {}'.format(err)) + if self.mainloop.is_running(): + self.mainloop.quit() + # return False to stop polling + return False def disconnect(self): ''' - Before application exits: - Advertisement is unregistered - Advertisement object created is removed from dbus - GATT Application is unregistered - GATT Application object created is removed from dbus - Disconnect device if connected - Adapter is powered off + Disconnect device ''' try: - global blecent_retry_check_cnt, discovery_start, verify_signal_check, signal_caught - blecent_retry_check_cnt = 0 - verify_signal_check = 0 - - print("\nexiting from test...") - - self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) - - if adv_registered: - # Unregister Advertisement - le_adv_manager_iface_obj.UnregisterAdvertisement(le_adv_obj.get_path()) - - # Remove Advertising data - dbus.service.Object.remove_from_connection(le_adv_obj) - - if gatt_app_registered: - # Unregister GATT Application - gatt_manager_iface_obj.UnregisterApplication(gatt_app_obj.get_path()) - - # Remove GATT data - dbus.service.Object.remove_from_connection(gatt_app_obj) - - GLib.timeout_add_seconds(5, self.verify_blecent_disconnect) - event_loop.run() - - if adv_stop: - print("Stop Advertising status: ", adv_stop) - else: - print("Warning: Stop Advertising status: ", adv_stop) - + if not self.device or not self.device.iface: + return + print('disconnecting device') # Disconnect device - if self.device: - print("disconnecting device...") - self.device.Disconnect(dbus_interface=DEVICE_IFACE) - if self.adapter: - self.adapter.RemoveDevice(self.device) + device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected') + if device_conn == 1: + self.device.iface.Disconnect(dbus_interface=DEVICE_IFACE) + for cnt in range(10, 0, -1): + time.sleep(5) + device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected') + if device_conn == 0: + print('device disconnected') + break + print('number of retries left ({})'.format(cnt - 1)) + if device_conn == 1: + print('failed to disconnect device') + + self.adapter.iface.RemoveDevice(self.device.iface) self.device = None - if discovery_start: - self.adapter.StopDiscovery() - discovery_start = False - time.sleep(15) - signal_caught = False - GLib.timeout_add_seconds(5, verify_signal_is_caught) - event_loop.run() - - if not device_connected: - print("device disconnected") - else: - print("Warning: device could not be disconnected") - - except Exception: - print(traceback.format_exc()) - return False + except dbus.exceptions.DBusException as dbus_err: + print('Warning: {}'.format(dbus_err)) + except Exception as err: + raise Exception('Failed to disconnect device: {}'.format(err)) diff --git a/tools/ble/lib_gap.py b/tools/ble/lib_gap.py index 02466c7aa1..5c8ee7b2f1 100644 --- a/tools/ble/lib_gap.py +++ b/tools/ble/lib_gap.py @@ -18,6 +18,7 @@ # Register Advertisement from __future__ import print_function + import sys try: @@ -27,11 +28,10 @@ except ImportError as e: if 'linux' not in sys.platform: raise e print(e) - print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue") - print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue") + print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue') + print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue') raise -ADV_OBJ = False DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' @@ -74,10 +74,8 @@ class Advertisement(dbus.service.Object): in_signature='s', out_signature='a{sv}') def GetAll(self, interface): - global ADV_OBJ if interface != LE_ADVERTISEMENT_IFACE: raise InvalidArgsException() - ADV_OBJ = True return self.get_properties()[LE_ADVERTISEMENT_IFACE] @dbus.service.method(LE_ADVERTISEMENT_IFACE, diff --git a/tools/ble/lib_gatt.py b/tools/ble/lib_gatt.py index 4710e1549c..0daaee14c6 100644 --- a/tools/ble/lib_gatt.py +++ b/tools/ble/lib_gatt.py @@ -18,6 +18,7 @@ # Creating GATT Application which then becomes available to remote devices. from __future__ import print_function + import sys try: @@ -27,17 +28,10 @@ except ImportError as e: if 'linux' not in sys.platform: raise e print(e) - print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue") - print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue") + print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue') + print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue') raise -alert_status_char_obj = None - -GATT_APP_OBJ = False -CHAR_READ = False -CHAR_WRITE = False -CHAR_SUBSCRIBE = False - DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' GATT_MANAGER_IFACE = 'org.bluez.GattManager1' @@ -46,6 +40,21 @@ GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' +SERVICE_UUIDS = { + 'ALERT_NOTIF_SVC_UUID': '00001811-0000-1000-8000-00805f9b34fb' +} + +CHAR_UUIDS = { + 'SUPPORT_NEW_ALERT_UUID': '00002A47-0000-1000-8000-00805f9b34fb', + 'ALERT_NOTIF_UUID': '00002A44-0000-1000-8000-00805f9b34fb', + 'UNREAD_ALERT_STATUS_UUID': '00002A45-0000-1000-8000-00805f9b34fb' +} + +DESCR_UUIDS = { + 'CCCD_UUID': '00002902-0000-1000-8000-00805f9b34fb' +} + + class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' @@ -62,22 +71,16 @@ class Application(dbus.service.Object): def __init__(self, bus, path): self.path = path self.services = [] - srv_obj = AlertNotificationService(bus, '0001') - self.add_service(srv_obj) dbus.service.Object.__init__(self, bus, self.path) def __del__(self): pass - def get_path(self): - return dbus.ObjectPath(self.path) - def add_service(self, service): self.services.append(service) @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): - global GATT_APP_OBJ response = {} for service in self.services: @@ -89,13 +92,25 @@ class Application(dbus.service.Object): for desc in descs: response[desc.get_path()] = desc.get_properties() - GATT_APP_OBJ = True return response + def get_path(self): + return dbus.ObjectPath(self.path) + def Release(self): pass +class AlertNotificationApp(Application): + ''' + Alert Notification Application + ''' + def __init__(self, bus, path): + Application.__init__(self, bus, path) + self.service = AlertNotificationService(bus, '0001') + self.add_service(self.service) + + class Service(dbus.service.Object): """ org.bluez.GattService1 interface implementation @@ -190,7 +205,6 @@ class Characteristic(dbus.service.Object): def GetAll(self, interface): if interface != GATT_CHRC_IFACE: raise InvalidArgsException() - return self.get_properties()[GATT_CHRC_IFACE] @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay') @@ -216,7 +230,8 @@ class Characteristic(dbus.service.Object): @dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}as') def PropertiesChanged(self, interface, changed, invalidated): - print("\nProperties Changed") + pass + # print('\nProperties Changed') class Descriptor(dbus.service.Object): @@ -249,7 +264,6 @@ class Descriptor(dbus.service.Object): def GetAll(self, interface): if interface != GATT_DESC_IFACE: raise InvalidArgsException() - return self.get_properties()[GATT_DESC_IFACE] @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay') @@ -262,151 +276,155 @@ class Descriptor(dbus.service.Object): print('Default WriteValue called, returning error') raise NotSupportedException() + @dbus.service.signal(DBUS_PROP_IFACE, + signature='sa{sv}as') + def PropertiesChanged(self, interface, changed, invalidated): + pass + # print('\nProperties Changed') + class AlertNotificationService(Service): - TEST_SVC_UUID = '00001811-0000-1000-8000-00805f9b34fb' - def __init__(self, bus, index): - global alert_status_char_obj - Service.__init__(self, bus, index, self.TEST_SVC_UUID, primary=True) + Service.__init__(self, bus, index, SERVICE_UUIDS['ALERT_NOTIF_SVC_UUID'], primary=True) self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self)) self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self)) - alert_status_char_obj = UnreadAlertStatusCharacteristic(bus, '0003', self) - self.add_characteristic(alert_status_char_obj) + self.add_characteristic(UnreadAlertStatusCharacteristic(bus, '0003', self)) + + def get_char_status(self, uuid, status): + for char in self.characteristics: + if char.uuid == uuid: + if status in char.status: + return True + return False class SupportedNewAlertCategoryCharacteristic(Characteristic): - SUPPORT_NEW_ALERT_UUID = '00002A47-0000-1000-8000-00805f9b34fb' - def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, - self.SUPPORT_NEW_ALERT_UUID, + CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'], ['read'], service) - self.value = [dbus.Byte(2)] + self.status = [] def ReadValue(self, options): - global CHAR_READ - CHAR_READ = True val_list = [] for val in self.value: val_list.append(dbus.Byte(val)) - print("Read Request received\n", "\tSupportedNewAlertCategoryCharacteristic") - print("\tValue:", "\t", val_list) + print('Read Request received\n', '\tSupportedNewAlertCategoryCharacteristic') + print('\tValue:', '\t', val_list) + self.status.append('read') return val_list class AlertNotificationControlPointCharacteristic(Characteristic): - ALERT_NOTIF_UUID = '00002A44-0000-1000-8000-00805f9b34fb' - def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, - self.ALERT_NOTIF_UUID, - ['read','write'], + CHAR_UUIDS['ALERT_NOTIF_UUID'], + ['read', 'write'], service) - self.value = [dbus.Byte(0)] + self.status = [] def ReadValue(self, options): val_list = [] for val in self.value: val_list.append(dbus.Byte(val)) - print("Read Request received\n", "\tAlertNotificationControlPointCharacteristic") - print("\tValue:", "\t", val_list) + print('Read Request received\n', '\tAlertNotificationControlPointCharacteristic') + print('\tValue:', '\t', val_list) + self.status.append('read') return val_list def WriteValue(self, value, options): - global CHAR_WRITE - CHAR_WRITE = True - print("Write Request received\n", "\tAlertNotificationControlPointCharacteristic") - print("\tCurrent value:", "\t", self.value) + print('Write Request received\n', '\tAlertNotificationControlPointCharacteristic') + print('\tCurrent value:', '\t', self.value) val_list = [] for val in value: val_list.append(val) self.value = val_list + self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, []) # Check if new value is written - print("\tNew value:", "\t", self.value) + print('\tNew value:', '\t', self.value) if not self.value == value: - print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) + print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value) + self.status.append('write') class UnreadAlertStatusCharacteristic(Characteristic): - UNREAD_ALERT_STATUS_UUID = '00002A45-0000-1000-8000-00805f9b34fb' - def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, - self.UNREAD_ALERT_STATUS_UUID, + CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'], ['read', 'write', 'notify'], service) self.value = [dbus.Byte(0)] self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self) self.add_descriptor(self.cccd_obj) self.notifying = False + self.status = [] def StartNotify(self): - global CHAR_SUBSCRIBE - CHAR_SUBSCRIBE = True + try: + if self.notifying: + print('\nAlready notifying, nothing to do') + return + print('Notify Started') + self.notifying = True + self.ReadValue() + self.WriteValue([dbus.Byte(1), dbus.Byte(0)]) + self.status.append('notify') - if self.notifying: - print('\nAlready notifying, nothing to do') - return - self.notifying = True - print("\nNotify Started") - self.cccd_obj.WriteValue([dbus.Byte(1), dbus.Byte(0)]) - self.cccd_obj.ReadValue() + except Exception as e: + print(e) def StopNotify(self): if not self.notifying: print('\nNot notifying, nothing to do') return self.notifying = False - print("\nNotify Stopped") + print('\nNotify Stopped') - def ReadValue(self, options): - print("Read Request received\n", "\tUnreadAlertStatusCharacteristic") + def ReadValue(self, options=None): val_list = [] for val in self.value: val_list.append(dbus.Byte(val)) - print("\tValue:", "\t", val_list) + self.status.append('read') + print('\tValue:', '\t', val_list) return val_list - def WriteValue(self, value, options): - print("Write Request received\n", "\tUnreadAlertStatusCharacteristic") + def WriteValue(self, value, options=None): val_list = [] for val in value: val_list.append(val) self.value = val_list + self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, []) # Check if new value is written - print("\tNew value:", "\t", self.value) if not self.value == value: - print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) + print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value) + print('New value:', '\t', self.value) + self.status.append('write') class ClientCharacteristicConfigurationDescriptor(Descriptor): - CCCD_UUID = '00002902-0000-1000-8000-00805f9b34fb' - def __init__(self, bus, index, characteristic): - self.value = [dbus.Byte(0)] + self.value = [dbus.Byte(1)] Descriptor.__init__( self, bus, index, - self.CCCD_UUID, + DESCR_UUIDS['CCCD_UUID'], ['read', 'write'], characteristic) - def ReadValue(self): - print("\tValue on read:", "\t", self.value) + def ReadValue(self, options=None): return self.value - def WriteValue(self, value): + def WriteValue(self, value, options=None): val_list = [] for val in value: val_list.append(val) self.value = val_list + self.PropertiesChanged(GATT_DESC_IFACE, {'Value': self.value}, []) # Check if new value is written - print("New value on write:", "\t", self.value) if not self.value == value: - print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value) + print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value) diff --git a/tools/ci/python_packages/wifi_tools.py b/tools/ci/python_packages/wifi_tools.py index b926efb55c..dc8c496e21 100644 --- a/tools/ci/python_packages/wifi_tools.py +++ b/tools/ci/python_packages/wifi_tools.py @@ -13,10 +13,11 @@ # limitations under the License. # +import time + import dbus import dbus.mainloop.glib import netifaces -import time def get_wiface_name(): @@ -43,25 +44,30 @@ class wpa_cli: self.new_network = None self.connected = False self.reset_on_exit = reset_on_exit - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - bus = dbus.SystemBus() - service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"), - "fi.w1.wpa_supplicant1") - iface_path = service.GetInterface(self.iface_name) - self.iface_obj = bus.get_object("fi.w1.wpa_supplicant1", iface_path) - self.iface_ifc = dbus.Interface(self.iface_obj, "fi.w1.wpa_supplicant1.Interface") - self.iface_props = dbus.Interface(self.iface_obj, 'org.freedesktop.DBus.Properties') - if self.iface_ifc is None: - raise RuntimeError('supplicant : Failed to fetch interface') + try: + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + bus = dbus.SystemBus() - self.old_network = self._get_iface_property("CurrentNetwork") - print("Old network is %s" % self.old_network) + service = dbus.Interface(bus.get_object('fi.w1.wpa_supplicant1', '/fi/w1/wpa_supplicant1'), + 'fi.w1.wpa_supplicant1') + iface_path = service.GetInterface(self.iface_name) + self.iface_obj = bus.get_object('fi.w1.wpa_supplicant1', iface_path) + self.iface_ifc = dbus.Interface(self.iface_obj, 'fi.w1.wpa_supplicant1.Interface') + self.iface_props = dbus.Interface(self.iface_obj, 'org.freedesktop.DBus.Properties') + if self.iface_ifc is None: + raise RuntimeError('supplicant : Failed to fetch interface') - if self.old_network == '/': - self.old_network = None - else: - self.connected = True + self.old_network = self._get_iface_property('CurrentNetwork') + print('Old network is %s' % self.old_network) + + if self.old_network == '/': + self.old_network = None + else: + self.connected = True + + except Exception as err: + raise Exception('Failure in wpa_cli init: {}'.format(err)) def _get_iface_property(self, name): """ Read the property with 'name' from the wi-fi interface object @@ -69,40 +75,44 @@ class wpa_cli: Note: The result is a dbus wrapped type, so should usually convert it to the corresponding native Python type """ - return self.iface_props.Get("fi.w1.wpa_supplicant1.Interface", name) + return self.iface_props.Get('fi.w1.wpa_supplicant1.Interface', name) def connect(self, ssid, password): - if self.connected is True: - self.iface_ifc.Disconnect() - self.connected = False + try: + if self.connected is True: + self.iface_ifc.Disconnect() + self.connected = False - if self.new_network is not None: - self.iface_ifc.RemoveNetwork(self.new_network) + if self.new_network is not None: + self.iface_ifc.RemoveNetwork(self.new_network) - print("Pre-connect state is %s, IP is %s" % (self._get_iface_property("State"), get_wiface_IPv4(self.iface_name))) + print('Pre-connect state is %s, IP is %s' % (self._get_iface_property('State'), get_wiface_IPv4(self.iface_name))) - self.new_network = self.iface_ifc.AddNetwork({"ssid": ssid, "psk": password}) - self.iface_ifc.SelectNetwork(self.new_network) + self.new_network = self.iface_ifc.AddNetwork({'ssid': ssid, 'psk': password}) + self.iface_ifc.SelectNetwork(self.new_network) + time.sleep(10) - ip = None - retry = 10 - while retry > 0: - time.sleep(5) + ip = None + retry = 10 + while retry > 0: + time.sleep(5) + state = str(self._get_iface_property('State')) + print('wpa iface state %s (scanning %s)' % (state, bool(self._get_iface_property('Scanning')))) + if state in ['disconnected', 'inactive']: + self.iface_ifc.Reconnect() + ip = get_wiface_IPv4(self.iface_name) + print('wpa iface %s IP %s' % (self.iface_name, ip)) + if ip is not None: + self.connected = True + return ip + retry -= 1 + time.sleep(3) - state = str(self._get_iface_property("State")) - print("wpa iface state %s (scanning %s)" % (state, bool(self._get_iface_property("Scanning")))) - if state in ["disconnected", "inactive"]: - self.iface_ifc.Reconnect() + self.reset() + print('wpa_cli : Connection failed') - ip = get_wiface_IPv4(self.iface_name) - print("wpa iface %s IP %s" % (self.iface_name, ip)) - if ip is not None: - self.connected = True - return ip - retry -= 1 - - self.reset() - raise RuntimeError('wpa_cli : Connection failed') + except Exception as err: + raise Exception('Failure in wpa_cli init: {}'.format(err)) def reset(self): if self.iface_ifc is not None: diff --git a/tools/esp_prov/transport/ble_cli.py b/tools/esp_prov/transport/ble_cli.py index 5d88ece806..90689208bd 100644 --- a/tools/esp_prov/transport/ble_cli.py +++ b/tools/esp_prov/transport/ble_cli.py @@ -14,12 +14,12 @@ # from __future__ import print_function -from builtins import input -from future.utils import iteritems import platform +from builtins import input import utils +from future.utils import iteritems fallback = True @@ -28,9 +28,10 @@ fallback = True # else fallback to console mode if platform.system() == 'Linux': try: + import time + import dbus import dbus.mainloop.glib - import time fallback = False except ImportError: pass @@ -41,13 +42,15 @@ if platform.system() == 'Linux': # BLE client (Linux Only) using Bluez and DBus class BLE_Bluez_Client: + def __init__(self): + self.adapter_props = None + def connect(self, devname, iface, chrc_names, fallback_srv_uuid): self.devname = devname self.srv_uuid_fallback = fallback_srv_uuid self.chrc_names = [name.lower() for name in chrc_names] self.device = None self.adapter = None - self.adapter_props = None self.services = None self.nu_lookup = None self.characteristics = dict() @@ -55,33 +58,69 @@ class BLE_Bluez_Client: dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") + manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') objects = manager.GetManagedObjects() - + adapter_path = None for path, interfaces in iteritems(objects): - adapter = interfaces.get("org.bluez.Adapter1") + adapter = interfaces.get('org.bluez.Adapter1') if adapter is not None: if path.endswith(iface): - self.adapter = dbus.Interface(bus.get_object("org.bluez", path), "org.bluez.Adapter1") - self.adapter_props = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties") + self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1') + self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties') + adapter_path = path break if self.adapter is None: - raise RuntimeError("Bluetooth adapter not found") + raise RuntimeError('Bluetooth adapter not found') - self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) - self.adapter.StartDiscovery() + # Power on bluetooth adapter + self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1)) + print('checking if adapter is powered on') + for cnt in range(10, 0, -1): + time.sleep(5) + powered_on = self.adapter_props.Get('org.bluez.Adapter1', 'Powered') + if powered_on == 1: + # Set adapter props again with powered on value + self.adapter_props = dbus.Interface(bus.get_object('org.bluez', adapter_path), 'org.freedesktop.DBus.Properties') + print('bluetooth adapter powered on') + break + print('number of retries left({})'.format(cnt - 1)) + if powered_on == 0: + raise RuntimeError('Failed to starte bluetooth adapter') + + # Start discovery if not already discovering + started_discovery = 0 + discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering') + if discovery_val == 0: + print('starting discovery') + self.adapter.StartDiscovery() + # Set as start discovery is called + started_discovery = 1 + for cnt in range(10, 0, -1): + time.sleep(5) + discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering') + if discovery_val == 1: + print('start discovery successful') + break + print('number of retries left ({})'.format(cnt - 1)) + + if discovery_val == 0: + print('start discovery failed') + raise RuntimeError('Failed to start discovery') retry = 10 while (retry > 0): try: if self.device is None: - print("Connecting...") + print('Connecting...') # Wait for device to be discovered time.sleep(5) - self._connect_() - print("Connected") - print("Getting Services...") + connected = self._connect_() + if connected: + print('Connected') + else: + return False + print('Getting Services...') # Wait for services to be discovered time.sleep(5) self._get_services_() @@ -89,28 +128,42 @@ class BLE_Bluez_Client: except Exception as e: print(e) retry -= 1 - print("Retries left", retry) + print('Retries left', retry) continue - self.adapter.StopDiscovery() + + # Call StopDiscovery() for corresponding StartDiscovery() session + if started_discovery == 1: + print('stopping discovery') + self.adapter.StopDiscovery() + for cnt in range(10, 0, -1): + time.sleep(5) + discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering') + if discovery_val == 0: + print('stop discovery successful') + break + print('number of retries left ({})'.format(cnt - 1)) + if discovery_val == 1: + print('stop discovery failed') + return False def _connect_(self): bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") + manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') objects = manager.GetManagedObjects() dev_path = None for path, interfaces in iteritems(objects): - if "org.bluez.Device1" not in interfaces: + if 'org.bluez.Device1' not in interfaces: continue - if interfaces["org.bluez.Device1"].get("Name") == self.devname: + if interfaces['org.bluez.Device1'].get('Name') == self.devname: dev_path = path break if dev_path is None: - raise RuntimeError("BLE device not found") + raise RuntimeError('BLE device not found') try: - self.device = bus.get_object("org.bluez", dev_path) + self.device = bus.get_object('org.bluez', dev_path) try: uuids = self.device.Get('org.bluez.Device1', 'UUIDs', dbus_interface='org.freedesktop.DBus.Properties') @@ -122,25 +175,42 @@ class BLE_Bluez_Client: if len(uuids) == 1: self.srv_uuid_adv = uuids[0] except dbus.exceptions.DBusException as e: - print(e) + raise RuntimeError(e) self.device.Connect(dbus_interface='org.bluez.Device1') + # Check device is connected successfully + for cnt in range(10, 0, -1): + time.sleep(5) + device_conn = self.device.Get( + 'org.bluez.Device1', + 'Connected', + dbus_interface='org.freedesktop.DBus.Properties') + if device_conn == 1: + print('device is connected') + break + print('number of retries left ({})'.format(cnt - 1)) + if device_conn == 0: + print('failed to connect device') + return False + + return True + except Exception as e: print(e) self.device = None - raise RuntimeError("BLE device could not connect") + raise RuntimeError('BLE device could not connect') def _get_services_(self): bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") + manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') objects = manager.GetManagedObjects() service_found = False for srv_path, srv_interfaces in iteritems(objects): - if "org.bluez.GattService1" not in srv_interfaces: + if 'org.bluez.GattService1' not in srv_interfaces: continue if not srv_path.startswith(self.device.object_path): continue - service = bus.get_object("org.bluez", srv_path) + service = bus.get_object('org.bluez', srv_path) srv_uuid = service.Get('org.bluez.GattService1', 'UUID', dbus_interface='org.freedesktop.DBus.Properties') @@ -152,28 +222,28 @@ class BLE_Bluez_Client: nu_lookup = dict() characteristics = dict() for chrc_path, chrc_interfaces in iteritems(objects): - if "org.bluez.GattCharacteristic1" not in chrc_interfaces: + if 'org.bluez.GattCharacteristic1' not in chrc_interfaces: continue if not chrc_path.startswith(service.object_path): continue - chrc = bus.get_object("org.bluez", chrc_path) + chrc = bus.get_object('org.bluez', chrc_path) uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID', dbus_interface='org.freedesktop.DBus.Properties') characteristics[uuid] = chrc for desc_path, desc_interfaces in iteritems(objects): - if "org.bluez.GattDescriptor1" not in desc_interfaces: + if 'org.bluez.GattDescriptor1' not in desc_interfaces: continue if not desc_path.startswith(chrc.object_path): continue - desc = bus.get_object("org.bluez", desc_path) + desc = bus.get_object('org.bluez', desc_path) desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID', dbus_interface='org.freedesktop.DBus.Properties') if desc_uuid[4:8] != '2901': continue try: readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1') - except dbus.exceptions.DBusException: - break + except dbus.exceptions.DBusException as err: + raise RuntimeError('Failed to read value for descriptor while getting services - {}'.format(err)) found_name = ''.join(chr(b) for b in readval).lower() nu_lookup[found_name] = uuid break @@ -200,12 +270,14 @@ class BLE_Bluez_Client: if not service_found: self.device.Disconnect(dbus_interface='org.bluez.Device1') + # Check if device is disconnected successfully + self._check_device_disconnected() if self.adapter: self.adapter.RemoveDevice(self.device) self.device = None self.nu_lookup = None self.characteristics = dict() - raise RuntimeError("Provisioning service not found") + raise RuntimeError('Provisioning service not found') def get_nu_lookup(self): return self.nu_lookup @@ -218,31 +290,47 @@ class BLE_Bluez_Client: def disconnect(self): if self.device: self.device.Disconnect(dbus_interface='org.bluez.Device1') + # Check if device is disconnected successfully + self._check_device_disconnected() if self.adapter: self.adapter.RemoveDevice(self.device) self.device = None self.nu_lookup = None self.characteristics = dict() if self.adapter_props: - self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0)) + self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0)) + + def _check_device_disconnected(self): + for cnt in range(10, 0, -1): + time.sleep(5) + device_conn = self.device.Get( + 'org.bluez.Device1', + 'Connected', + dbus_interface='org.freedesktop.DBus.Properties') + if device_conn == 0: + print('device disconnected') + break + print('number of retries left ({})'.format(cnt - 1)) + if device_conn == 1: + print('failed to disconnect device') def send_data(self, characteristic_uuid, data): try: path = self.characteristics[characteristic_uuid] except KeyError: - raise RuntimeError("Invalid characteristic : " + characteristic_uuid) + raise RuntimeError('Invalid characteristic : ' + characteristic_uuid) try: path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1') except TypeError: # python3 compatible path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1') except dbus.exceptions.DBusException as e: - raise RuntimeError("Failed to write value to characteristic " + characteristic_uuid + ": " + str(e)) + raise RuntimeError('Failed to write value to characteristic ' + characteristic_uuid + ': ' + str(e)) try: readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1') except dbus.exceptions.DBusException as e: - raise RuntimeError("Failed to read value from characteristic " + characteristic_uuid + ": " + str(e)) + raise RuntimeError('Failed to read value from characteristic ' + characteristic_uuid + ': ' + str(e)) return ''.join(chr(b) for b in readval) @@ -252,14 +340,14 @@ class BLE_Bluez_Client: # Console based BLE client for Cross Platform support class BLE_Console_Client: def connect(self, devname, iface, chrc_names, fallback_srv_uuid): - print("BLE client is running in console mode") - print("\tThis could be due to your platform not being supported or dependencies not being met") - print("\tPlease ensure all pre-requisites are met to run the full fledged client") - print("BLECLI >> Please connect to BLE device `" + devname + "` manually using your tool of choice") - resp = input("BLECLI >> Was the device connected successfully? [y/n] ") + print('BLE client is running in console mode') + print('\tThis could be due to your platform not being supported or dependencies not being met') + print('\tPlease ensure all pre-requisites are met to run the full fledged client') + print('BLECLI >> Please connect to BLE device `' + devname + '` manually using your tool of choice') + resp = input('BLECLI >> Was the device connected successfully? [y/n] ') if resp != 'Y' and resp != 'y': return False - print("BLECLI >> List available attributes of the connected device") + print('BLECLI >> List available attributes of the connected device') resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ") if resp != 'Y' and resp != 'y': return False @@ -279,9 +367,9 @@ class BLE_Console_Client: def send_data(self, characteristic_uuid, data): print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :") - print("\t>> " + utils.str_to_hexstr(data)) - print("BLECLI >> Enter data read from characteristic (in hex) :") - resp = input("\t<< ") + print('\t>> ' + utils.str_to_hexstr(data)) + print('BLECLI >> Enter data read from characteristic (in hex) :') + resp = input('\t<< ') return utils.hexstr_to_str(resp) diff --git a/tools/esp_prov/transport/transport_http.py b/tools/esp_prov/transport/transport_http.py index c686128828..bac6b0cfe1 100644 --- a/tools/esp_prov/transport/transport_http.py +++ b/tools/esp_prov/transport/transport_http.py @@ -14,43 +14,47 @@ # from __future__ import print_function -from future.utils import tobytes import socket -import http.client -import ssl + +from future.utils import tobytes + +try: + from http.client import HTTPConnection, HTTPSConnection +except ImportError: + # Python 2 fallback + from httplib import HTTPConnection, HTTPSConnection from .transport import Transport class Transport_HTTP(Transport): - def __init__(self, hostname, certfile=None): + def __init__(self, hostname, ssl_context=None): try: socket.gethostbyname(hostname.split(':')[0]) except socket.gaierror: - raise RuntimeError("Unable to resolve hostname :" + hostname) + raise RuntimeError('Unable to resolve hostname :' + hostname) - if certfile is None: - self.conn = http.client.HTTPConnection(hostname, timeout=45) + if ssl_context is None: + self.conn = HTTPConnection(hostname, timeout=60) else: - ssl_ctx = ssl.create_default_context(cafile=certfile) - self.conn = http.client.HTTPSConnection(hostname, context=ssl_ctx, timeout=45) + self.conn = HTTPSConnection(hostname, context=ssl_context, timeout=60) try: - print("Connecting to " + hostname) + print('Connecting to ' + hostname) self.conn.connect() except Exception as err: - raise RuntimeError("Connection Failure : " + str(err)) - self.headers = {"Content-type": "application/x-www-form-urlencoded","Accept": "text/plain"} + raise RuntimeError('Connection Failure : ' + str(err)) + self.headers = {'Content-type': 'application/x-www-form-urlencoded','Accept': 'text/plain'} def _send_post_request(self, path, data): try: - self.conn.request("POST", path, tobytes(data), self.headers) + self.conn.request('POST', path, tobytes(data), self.headers) response = self.conn.getresponse() if response.status == 200: return response.read().decode('latin-1') except Exception as err: - raise RuntimeError("Connection Failure : " + str(err)) - raise RuntimeError("Server responded with error code " + str(response.status)) + raise RuntimeError('Connection Failure : ' + str(err)) + raise RuntimeError('Server responded with error code ' + str(response.status)) def send_data(self, ep_name, data): return self._send_post_request('/' + ep_name, data)