esp-idf/tools/ble/lib_gatt.py

431 wiersze
14 KiB
Python
Czysty Zwykły widok Historia

#!/usr/bin/env python
#
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Creating GATT Application which then becomes available to remote devices.
from __future__ import print_function
import sys
try:
import dbus
import dbus.service
except ImportError as e:
if 'linux' not in sys.platform:
raise e
print(e)
print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
raise
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
SERVICE_UUIDS = {
'ALERT_NOTIF_SVC_UUID': '00001811-0000-1000-8000-00805f9b34fb'
}
CHAR_UUIDS = {
'SUPPORT_NEW_ALERT_UUID': '00002A47-0000-1000-8000-00805f9b34fb',
'ALERT_NOTIF_UUID': '00002A44-0000-1000-8000-00805f9b34fb',
'UNREAD_ALERT_STATUS_UUID': '00002A45-0000-1000-8000-00805f9b34fb'
}
DESCR_UUIDS = {
'CCCD_UUID': '00002902-0000-1000-8000-00805f9b34fb'
}
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class Application(dbus.service.Object):
"""
org.bluez.GattApplication1 interface implementation
"""
def __init__(self, bus, path):
self.path = path
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
def __del__(self):
pass
def add_service(self, service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
return response
def get_path(self):
return dbus.ObjectPath(self.path)
def Release(self):
pass
class AlertNotificationApp(Application):
'''
Alert Notification Application
'''
def __init__(self, bus, path):
Application.__init__(self, bus, path)
self.service = AlertNotificationService(bus, '0001')
self.add_service(self.service)
class Service(dbus.service.Object):
"""
org.bluez.GattService1 interface implementation
"""
PATH_BASE = '/org/bluez/hci0/service'
def __init__(self, bus, index, uuid, primary=False):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
self.get_characteristic_paths(),
signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_characteristic(self, characteristic):
self.characteristics.append(characteristic)
def get_characteristic_paths(self):
result = []
for chrc in self.characteristics:
result.append(chrc.get_path())
return result
def get_characteristics(self):
return self.characteristics
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_SERVICE_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_SERVICE_IFACE]
class Characteristic(dbus.service.Object):
"""
org.bluez.GattCharacteristic1 interface implementation
"""
def __init__(self, bus, index, uuid, flags, service):
self.path = service.path + '/char' + str(index)
self.bus = bus
self.uuid = uuid
self.service = service
self.flags = flags
self.value = [0]
self.descriptors = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Value': self.value,
'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_descriptor(self, descriptor):
self.descriptors.append(descriptor)
def get_descriptor_paths(self):
result = []
for desc in self.descriptors:
result.append(desc.get_path())
return result
def get_descriptors(self):
return self.descriptors
@dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_CHRC_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_CHRC_IFACE]
@dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
def ReadValue(self, options):
print('\nDefault ReadValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
def WriteValue(self, value, options):
print('\nDefault WriteValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StartNotify(self):
print('Default StartNotify called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StopNotify(self):
print('Default StopNotify called, returning error')
raise NotSupportedException()
@dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
pass
# print('\nProperties Changed')
class Descriptor(dbus.service.Object):
"""
org.bluez.GattDescriptor1 interface implementation
"""
def __init__(self, bus, index, uuid, flags, characteristic):
self.path = characteristic.path + '/desc' + str(index)
self.bus = bus
self.uuid = uuid
self.flags = flags
self.chrc = characteristic
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_DESC_IFACE: {
'Characteristic': self.chrc.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_DESC_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_DESC_IFACE]
@dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
def ReadValue(self, options):
print('Default ReadValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
def WriteValue(self, value, options):
print('Default WriteValue called, returning error')
raise NotSupportedException()
@dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
pass
# print('\nProperties Changed')
class AlertNotificationService(Service):
def __init__(self, bus, index):
Service.__init__(self, bus, index, SERVICE_UUIDS['ALERT_NOTIF_SVC_UUID'], primary=True)
self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
self.add_characteristic(UnreadAlertStatusCharacteristic(bus, '0003', self))
def get_char_status(self, uuid, status):
for char in self.characteristics:
if char.uuid == uuid:
if status in char.status:
return True
return False
class SupportedNewAlertCategoryCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'],
['read'],
service)
self.value = [dbus.Byte(2)]
self.status = []
def ReadValue(self, options):
val_list = []
for val in self.value:
val_list.append(dbus.Byte(val))
print('Read Request received\n', '\tSupportedNewAlertCategoryCharacteristic')
print('\tValue:', '\t', val_list)
self.status.append('read')
return val_list
class AlertNotificationControlPointCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
CHAR_UUIDS['ALERT_NOTIF_UUID'],
['read', 'write'],
service)
self.value = [dbus.Byte(0)]
self.status = []
def ReadValue(self, options):
val_list = []
for val in self.value:
val_list.append(dbus.Byte(val))
print('Read Request received\n', '\tAlertNotificationControlPointCharacteristic')
print('\tValue:', '\t', val_list)
self.status.append('read')
return val_list
def WriteValue(self, value, options):
print('Write Request received\n', '\tAlertNotificationControlPointCharacteristic')
print('\tCurrent value:', '\t', self.value)
val_list = []
for val in value:
val_list.append(val)
self.value = val_list
self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
# Check if new value is written
print('\tNew value:', '\t', self.value)
if not self.value == value:
print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
self.status.append('write')
class UnreadAlertStatusCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'],
['read', 'write', 'notify'],
service)
self.value = [dbus.Byte(0)]
self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
self.add_descriptor(self.cccd_obj)
self.notifying = False
self.status = []
def StartNotify(self):
try:
if self.notifying:
print('\nAlready notifying, nothing to do')
return
print('Notify Started')
self.notifying = True
self.ReadValue()
self.WriteValue([dbus.Byte(1), dbus.Byte(0)])
self.status.append('notify')
except Exception as e:
print(e)
def StopNotify(self):
if not self.notifying:
print('\nNot notifying, nothing to do')
return
self.notifying = False
print('\nNotify Stopped')
def ReadValue(self, options=None):
val_list = []
for val in self.value:
val_list.append(dbus.Byte(val))
self.status.append('read')
print('\tValue:', '\t', val_list)
return val_list
def WriteValue(self, value, options=None):
val_list = []
for val in value:
val_list.append(val)
self.value = val_list
self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
# Check if new value is written
if not self.value == value:
print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
print('New value:', '\t', self.value)
self.status.append('write')
class ClientCharacteristicConfigurationDescriptor(Descriptor):
def __init__(self, bus, index, characteristic):
self.value = [dbus.Byte(1)]
Descriptor.__init__(
self, bus, index,
DESCR_UUIDS['CCCD_UUID'],
['read', 'write'],
characteristic)
def ReadValue(self, options=None):
return self.value
def WriteValue(self, value, options=None):
val_list = []
for val in value:
val_list.append(val)
self.value = val_list
self.PropertiesChanged(GATT_DESC_IFACE, {'Value': self.value}, [])
# Check if new value is written
if not self.value == value:
print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)