# Copyright 2018 Espressif Systems (Shanghai) PTE LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from __future__ import print_function import platform from builtins import input import utils from future.utils import iteritems fallback = True # Check if platform is Linux and required packages are installed # else fallback to console mode if platform.system() == 'Linux': try: import time import dbus import dbus.mainloop.glib fallback = False except ImportError: pass # -------------------------------------------------------------------- # BLE client (Linux Only) using Bluez and DBus class BLE_Bluez_Client: def __init__(self): self.adapter_props = None def connect(self, devname, iface, chrc_names, fallback_srv_uuid): self.devname = devname self.srv_uuid_fallback = fallback_srv_uuid self.chrc_names = [name.lower() for name in chrc_names] self.device = None self.adapter = None self.services = None self.nu_lookup = None self.characteristics = dict() self.srv_uuid_adv = None dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') objects = manager.GetManagedObjects() adapter_path = None for path, interfaces in iteritems(objects): adapter = interfaces.get('org.bluez.Adapter1') if adapter is not None: if path.endswith(iface): self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1') self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties') adapter_path = path break if self.adapter is None: raise RuntimeError('Bluetooth adapter not found') # Power on bluetooth adapter self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1)) print('checking if adapter is powered on') for cnt in range(10, 0, -1): time.sleep(5) powered_on = self.adapter_props.Get('org.bluez.Adapter1', 'Powered') if powered_on == 1: # Set adapter props again with powered on value self.adapter_props = dbus.Interface(bus.get_object('org.bluez', adapter_path), 'org.freedesktop.DBus.Properties') print('bluetooth adapter powered on') break print('number of retries left({})'.format(cnt - 1)) if powered_on == 0: raise RuntimeError('Failed to starte bluetooth adapter') # Start discovery if not already discovering started_discovery = 0 discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering') if discovery_val == 0: print('starting discovery') self.adapter.StartDiscovery() # Set as start discovery is called started_discovery = 1 for cnt in range(10, 0, -1): time.sleep(5) discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering') if discovery_val == 1: print('start discovery successful') break print('number of retries left ({})'.format(cnt - 1)) if discovery_val == 0: print('start discovery failed') raise RuntimeError('Failed to start discovery') retry = 10 while (retry > 0): try: if self.device is None: print('Connecting...') # Wait for device to be discovered time.sleep(5) connected = self._connect_() if connected: print('Connected') else: return False print('Getting Services...') # Wait for services to be discovered time.sleep(5) self._get_services_() return True except Exception as e: print(e) retry -= 1 print('Retries left', retry) continue # Call StopDiscovery() for corresponding StartDiscovery() session if started_discovery == 1: print('stopping discovery') self.adapter.StopDiscovery() for cnt in range(10, 0, -1): time.sleep(5) discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering') if discovery_val == 0: print('stop discovery successful') break print('number of retries left ({})'.format(cnt - 1)) if discovery_val == 1: print('stop discovery failed') return False def _connect_(self): bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') objects = manager.GetManagedObjects() dev_path = None for path, interfaces in iteritems(objects): if 'org.bluez.Device1' not in interfaces: continue if interfaces['org.bluez.Device1'].get('Name') == self.devname: dev_path = path break if dev_path is None: raise RuntimeError('BLE device not found') try: self.device = bus.get_object('org.bluez', dev_path) try: uuids = self.device.Get('org.bluez.Device1', 'UUIDs', dbus_interface='org.freedesktop.DBus.Properties') # There should be 1 service UUID in advertising data # If bluez had cached an old version of the advertisement data # the list of uuids may be incorrect, in which case connection # or service discovery may fail the first time. If that happens # the cache will be refreshed before next retry if len(uuids) == 1: self.srv_uuid_adv = uuids[0] except dbus.exceptions.DBusException as e: raise RuntimeError(e) self.device.Connect(dbus_interface='org.bluez.Device1') # Check device is connected successfully for cnt in range(10, 0, -1): time.sleep(5) device_conn = self.device.Get( 'org.bluez.Device1', 'Connected', dbus_interface='org.freedesktop.DBus.Properties') if device_conn == 1: print('device is connected') break print('number of retries left ({})'.format(cnt - 1)) if device_conn == 0: print('failed to connect device') return False return True except Exception as e: print(e) self.device = None raise RuntimeError('BLE device could not connect') def _get_services_(self): bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') objects = manager.GetManagedObjects() service_found = False for srv_path, srv_interfaces in iteritems(objects): if 'org.bluez.GattService1' not in srv_interfaces: continue if not srv_path.startswith(self.device.object_path): continue service = bus.get_object('org.bluez', srv_path) srv_uuid = service.Get('org.bluez.GattService1', 'UUID', dbus_interface='org.freedesktop.DBus.Properties') # If service UUID doesn't match the one found in advertisement data # then also check if it matches the fallback UUID if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]: continue nu_lookup = dict() characteristics = dict() for chrc_path, chrc_interfaces in iteritems(objects): if 'org.bluez.GattCharacteristic1' not in chrc_interfaces: continue if not chrc_path.startswith(service.object_path): continue chrc = bus.get_object('org.bluez', chrc_path) uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID', dbus_interface='org.freedesktop.DBus.Properties') characteristics[uuid] = chrc for desc_path, desc_interfaces in iteritems(objects): if 'org.bluez.GattDescriptor1' not in desc_interfaces: continue if not desc_path.startswith(chrc.object_path): continue desc = bus.get_object('org.bluez', desc_path) desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID', dbus_interface='org.freedesktop.DBus.Properties') if desc_uuid[4:8] != '2901': continue try: readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1') except dbus.exceptions.DBusException as err: raise RuntimeError('Failed to read value for descriptor while getting services - {}'.format(err)) found_name = ''.join(chr(b) for b in readval).lower() nu_lookup[found_name] = uuid break match_found = True for name in self.chrc_names: if name not in nu_lookup: # Endpoint name not present match_found = False break # Create lookup table only if all endpoint names found self.nu_lookup = [None, nu_lookup][match_found] self.characteristics = characteristics service_found = True # If the service UUID matches that in the advertisement # we can stop the search now. If it doesn't match, we # have found the service corresponding to the fallback # UUID, in which case don't break and keep searching # for the advertised service if srv_uuid == self.srv_uuid_adv: break if not service_found: self.device.Disconnect(dbus_interface='org.bluez.Device1') # Check if device is disconnected successfully self._check_device_disconnected() if self.adapter: self.adapter.RemoveDevice(self.device) self.device = None self.nu_lookup = None self.characteristics = dict() raise RuntimeError('Provisioning service not found') def get_nu_lookup(self): return self.nu_lookup def has_characteristic(self, uuid): if uuid in self.characteristics: return True return False def disconnect(self): if self.device: self.device.Disconnect(dbus_interface='org.bluez.Device1') # Check if device is disconnected successfully self._check_device_disconnected() if self.adapter: self.adapter.RemoveDevice(self.device) self.device = None self.nu_lookup = None self.characteristics = dict() if self.adapter_props: self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0)) def _check_device_disconnected(self): for cnt in range(10, 0, -1): time.sleep(5) device_conn = self.device.Get( 'org.bluez.Device1', 'Connected', dbus_interface='org.freedesktop.DBus.Properties') if device_conn == 0: print('device disconnected') break print('number of retries left ({})'.format(cnt - 1)) if device_conn == 1: print('failed to disconnect device') def send_data(self, characteristic_uuid, data): try: path = self.characteristics[characteristic_uuid] except KeyError: raise RuntimeError('Invalid characteristic : ' + characteristic_uuid) try: path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1') except TypeError: # python3 compatible path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1') except dbus.exceptions.DBusException as e: raise RuntimeError('Failed to write value to characteristic ' + characteristic_uuid + ': ' + str(e)) try: readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1') except dbus.exceptions.DBusException as e: raise RuntimeError('Failed to read value from characteristic ' + characteristic_uuid + ': ' + str(e)) return ''.join(chr(b) for b in readval) # -------------------------------------------------------------------- # Console based BLE client for Cross Platform support class BLE_Console_Client: def connect(self, devname, iface, chrc_names, fallback_srv_uuid): print('BLE client is running in console mode') print('\tThis could be due to your platform not being supported or dependencies not being met') print('\tPlease ensure all pre-requisites are met to run the full fledged client') print('BLECLI >> Please connect to BLE device `' + devname + '` manually using your tool of choice') resp = input('BLECLI >> Was the device connected successfully? [y/n] ') if resp != 'Y' and resp != 'y': return False print('BLECLI >> List available attributes of the connected device') resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ") if resp != 'Y' and resp != 'y': return False return True def get_nu_lookup(self): return None def has_characteristic(self, uuid): resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ") if resp != 'Y' and resp != 'y': return False return True def disconnect(self): pass def send_data(self, characteristic_uuid, data): print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :") print('\t>> ' + utils.str_to_hexstr(data)) print('BLECLI >> Enter data read from characteristic (in hex) :') resp = input('\t<< ') return utils.hexstr_to_str(resp) # -------------------------------------------------------------------- # Function to get client instance depending upon platform def get_client(): if fallback: return BLE_Console_Client() return BLE_Bluez_Client()