diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index d114c7a7..e3556a7d 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -12,7 +12,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: '3.7' + python-version: '3.12' - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 41488b0e..78112a4f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -4,8 +4,9 @@ jobs: tests: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: - python-version: [2.7, 3.9] + python-version: [3.9, 3.12] steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/.gitignore b/.gitignore index 1f807bb1..52b1013b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,6 @@ cover/ .coverage output.xml flake8.log -nosetests.xml pylint.log .idea/ dist/ diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 19d580b9..9efbd389 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,11 @@ Release History ================ +V5.0.0 +------ + +* Drop support for Python 2 + V4.0.5 ------ diff --git a/Makefile b/Makefile index a1272a09..f0304363 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ uninstall: clean: @rm -rf *.egg* build dist *.py[oc] */*.py[co] cover doctest_pypi.cfg \ - nosetests.xml pylint.log output.xml flake8.log tests.log \ + pylint.log output.xml flake8.log tests.log \ test-result.xml htmlcov fab.log .coverage publish: diff --git a/README.rst b/README.rst index 30cd5d12..d0203f9f 100644 --- a/README.rst +++ b/README.rst @@ -23,10 +23,7 @@ leaves something to be desired and only works in Linux. Requirements ------------ -- Python 2.7.5 or greater, or Python 3.5 or greater - - - Python 2.7.3's ``struct`` library has a bug that will break PyGATT - 2.7.5 - or greater is recommended. +- Python 3.9 or greater - BlueZ 5.18 or greater (with gatttool) - required for the gatttool backend only. diff --git a/pygatt/__init__.py b/pygatt/__init__.py index e1ae7ddd..64c79a94 100644 --- a/pygatt/__init__.py +++ b/pygatt/__init__.py @@ -5,6 +5,7 @@ class NullHandler(logging.Handler): def emit(self, record): pass + # Initialize a null handler for logging to avoid printing spurious "No handlers # could be found for logger" messages. logging.getLogger(__name__).addHandler(NullHandler()) diff --git a/pygatt/backends/backend.py b/pygatt/backends/backend.py index 930bed1b..4e6c6ccf 100644 --- a/pygatt/backends/backend.py +++ b/pygatt/backends/backend.py @@ -6,7 +6,7 @@ DEFAULT_CONNECT_TIMEOUT_S = 5.0 -BLEAddressType = Enum('BLEAddressType', 'public random') +BLEAddressType = Enum("BLEAddressType", "public random") class BLEBackend(object): @@ -21,8 +21,7 @@ def start(self): raise NotImplementedError() def stop(self): - """Stop and free any resources required while the backend is running. - """ + """Stop and free any resources required while the backend is running.""" raise NotImplementedError() def supports_unbonded(self): @@ -54,8 +53,7 @@ def filtered_scan(self, name_filter="", *args, **kwargs): Returns a list of BLE devices found. """ devices = self.scan(*args, **kwargs) - return [device for device in devices - if name_filter in (device['name'] or '')] + return [device for device in devices if name_filter in (device["name"] or "")] def clear_bond(self, address=None): raise NotImplementedError() @@ -67,6 +65,7 @@ class Characteristic(object): Only valid for the lifespan of a BLE connection, since the handle values are dynamic. """ + def __init__(self, uuid, handle): """ Sets the characteritic uuid and handle. @@ -86,5 +85,8 @@ def add_descriptor(self, uuid, handle): self.descriptors[uuid] = handle def __str__(self): - return "<%s uuid=%s handle=%d>" % (self.__class__.__name__, - self.uuid, self.handle) + return "<%s uuid=%s handle=%d>" % ( + self.__class__.__name__, + self.uuid, + self.handle, + ) diff --git a/pygatt/backends/bgapi/bgapi.py b/pygatt/backends/bgapi/bgapi.py index 6f4ea2df..30f72c25 100755 --- a/pygatt/backends/bgapi/bgapi.py +++ b/pygatt/backends/bgapi/bgapi.py @@ -32,11 +32,21 @@ import termios except ImportError: # Running in Windows (not Linux/OS X/Cygwin) - serial_exceptions = (RuntimeError, ValueError, - BGAPIError, serial.serialutil.SerialException) + serial_exceptions = ( + RuntimeError, + ValueError, + BGAPIError, + serial.serialutil.SerialException, + ) else: - serial_exceptions = (termios.error, IOError, OSError, TypeError, - BGAPIError, serial.serialutil.SerialException) + serial_exceptions = ( + termios.error, + IOError, + OSError, + TypeError, + BGAPIError, + serial.serialutil.SerialException, + ) log = logging.getLogger(__name__) @@ -46,9 +56,10 @@ MAX_CONNECTION_ATTEMPTS = 10 -UUIDType = Enum('UUIDType', ['custom', 'service', 'attribute', - 'descriptor', 'characteristic', - 'nonstandard']) +UUIDType = Enum( + "UUIDType", + ["custom", "service", "attribute", "descriptor", "characteristic", "nonstandard"], +) def _timed_out(start_time, timeout): @@ -56,9 +67,8 @@ def _timed_out(start_time, timeout): def bgapi_address_to_hex(address): - address = hexlify(bytearray( - list(reversed(address)))).upper().decode('ascii') - return ':'.join(''.join(pair) for pair in zip(*[iter(address)] * 2)) + address = hexlify(bytearray(list(reversed(address)))).upper().decode("ascii") + return ":".join("".join(pair) for pair in zip(*[iter(address)] * 2)) class AdvertisingAndScanInfo(object): @@ -66,6 +76,7 @@ class AdvertisingAndScanInfo(object): Holds the advertising and scan response packet data from a device at a given address. """ + def __init__(self): self.name = "" self.address = "" @@ -79,6 +90,7 @@ class BGAPIBackend(BLEBackend): """ A BLE backend for a BGAPI compatible USB adapter. """ + def __init__(self, serial_port=None, receive_queue_timeout=0.1): """ Initialize the backend, but don't start the USB connection yet. Must @@ -114,15 +126,17 @@ def __init__(self, serial_port=None, receive_queue_timeout=0.1): self._current_characteristic = None # used in char/descriptor discovery self._packet_handlers = { ResponsePacketType.sm_get_bonds: self._ble_rsp_sm_get_bonds, - ResponsePacketType.system_address_get: ( - self._ble_rsp_system_address_get), + ResponsePacketType.system_address_get: (self._ble_rsp_system_address_get), EventPacketType.attclient_attribute_value: ( - self._ble_evt_attclient_attribute_value), + self._ble_evt_attclient_attribute_value + ), EventPacketType.attclient_find_information_found: ( - self._ble_evt_attclient_find_information_found), + self._ble_evt_attclient_find_information_found + ), EventPacketType.connection_status: self._ble_evt_connection_status, EventPacketType.connection_disconnected: ( - self._ble_evt_connection_disconnected), + self._ble_evt_connection_disconnected + ), EventPacketType.gap_scan_response: self._ble_evt_gap_scan_response, EventPacketType.sm_bond_status: self._ble_evt_sm_bond_status, } @@ -137,21 +151,21 @@ def _detect_device_port(self): except serial.serialutil.SerialException: raise BGAPIError( "Unable to detect BLED112 serial port: {}.".format( - self._serial_port)) + self._serial_port + ) + ) else: log.info("Auto-detecting serial port for BLED112") detected_devices = find_usb_serial_devices( - vendor_id=BLED112_VENDOR_ID, - product_id=BLED112_PRODUCT_ID) + vendor_id=BLED112_VENDOR_ID, product_id=BLED112_PRODUCT_ID + ) if len(detected_devices) == 0: raise BGAPIError("Unable to auto-detect BLED112 serial port") - log.info("Found BLED112 on serial port %s", - detected_devices[0].port_name) + log.info("Found BLED112 on serial port %s", detected_devices[0].port_name) return detected_devices[0].port_name - def _open_serial_port(self, - max_connection_attempts=MAX_CONNECTION_ATTEMPTS): + def _open_serial_port(self, max_connection_attempts=MAX_CONNECTION_ATTEMPTS): """ Open a connection to the named serial port, or auto-detect the first port matching the BLED device. This will wait until data can actually be @@ -165,14 +179,12 @@ def _open_serial_port(self, attempts, with a short pause in between each attempt. """ for attempt in range(max_connection_attempts): - log.debug("Opening connection to serial port (attempt %d)", - attempt + 1) + log.debug("Opening connection to serial port (attempt %d)", attempt + 1) try: serial_port = self._serial_port or self._detect_device_port() self._ser = None - self._ser = serial.Serial(serial_port, baudrate=115200, - timeout=0.25) + self._ser = serial.Serial(serial_port, baudrate=115200, timeout=0.25) # Wait until we can actually read from the device self._ser.read() break @@ -181,13 +193,13 @@ def _open_serial_port(self, if self._ser: self._ser.close() elif attempt == (max_connection_attempts - 1): - raise NotConnectedError( - "No BGAPI compatible device detected") + raise NotConnectedError("No BGAPI compatible device detected") self._ser = None time.sleep(0.25) else: - raise NotConnectedError("Unable to reconnect with USB " - "device after rebooting") + raise NotConnectedError( + "Unable to reconnect with USB " "device after rebooting" + ) def start(self, reset=True, delay_after_reset_s=1): """ @@ -211,8 +223,7 @@ def start(self, reset=True, delay_after_reset_s=1): # The zero param just means we want to do a normal restart instead of # starting a firmware update restart. if reset: - log.info( - "Resetting and reconnecting to device for a clean environment") + log.info("Resetting and reconnecting to device for a clean environment") self._open_serial_port() self.send_command(CommandBuilder.system_reset(0)) self._ser.close() @@ -256,7 +267,7 @@ def stop(self): pass if self._running: if self._running.is_set(): - log.info('Stopping') + log.info("Stopping") self._running.clear() if self._receiver: @@ -270,15 +281,19 @@ def stop(self): def set_bondable(self, bondable): self.send_command( CommandBuilder.sm_set_bondable_mode( - constants.bondable['yes' if bondable else 'no'])) + constants.bondable["yes" if bondable else "no"] + ) + ) self.expect(ResponsePacketType.sm_set_bondable_mode) def disable_advertising(self): log.info("Disabling advertising") self.send_command( CommandBuilder.gap_set_mode( - constants.gap_discoverable_mode['non_discoverable'], - constants.gap_connectable_mode['non_connectable'])) + constants.gap_discoverable_mode["non_discoverable"], + constants.gap_connectable_mode["non_connectable"], + ) + ) self.expect(ResponsePacketType.gap_set_mode) def send_command(self, *args, **kwargs): @@ -320,9 +335,16 @@ def clear_bond(self, address=None): self.send_command(CommandBuilder.sm_delete_bonding(b)) self.expect(ResponsePacketType.sm_delete_bonding) - def scan(self, timeout=10, scan_interval=75, scan_window=50, active=True, - discover_mode=constants.gap_discover_mode['observation'], - scan_cb=None, **kwargs): + def scan( + self, + timeout=10, + scan_interval=75, + scan_window=50, + active=True, + discover_mode=constants.gap_discover_mode["observation"], + scan_cb=None, + **kwargs + ): """ Perform a scan to discover BLE devices. @@ -345,7 +367,8 @@ def scan(self, timeout=10, scan_interval=75, scan_window=50, active=True, self.send_command( CommandBuilder.gap_set_scan_parameters( scan_interval, scan_window, parameters - )) + ) + ) self.expect(ResponsePacketType.gap_set_scan_parameters) @@ -361,8 +384,7 @@ def scan(self, timeout=10, scan_interval=75, scan_window=50, active=True, while self._evt.is_set(): try: - self.expect(EventPacketType.gap_scan_response, - timeout=timeout) + self.expect(EventPacketType.gap_scan_response, timeout=timeout) except ExpectedResponseTimeout: pass if _timed_out(start_time, timeout): @@ -374,12 +396,14 @@ def scan(self, timeout=10, scan_interval=75, scan_window=50, active=True, devices = [] for address, info in self._devices_discovered.items(): - devices.append({ - 'address': address, - 'name': info.name, - 'rssi': info.rssi, - 'packet_data': info.packet_data - }) + devices.append( + { + "address": address, + "name": info.name, + "rssi": info.rssi, + "packet_data": info.packet_data, + } + ) log.info("Discovered %d devices: %s", len(devices), devices) self._devices_discovered = {} return devices @@ -388,10 +412,16 @@ def _end_procedure(self): self.send_command(CommandBuilder.gap_end_procedure()) self.expect(ResponsePacketType.gap_end_procedure) - def connect(self, address, timeout=5, - address_type=BLEAddressType.public, - interval_min=60, interval_max=76, supervision_timeout=100, - latency=0): + def connect( + self, + address, + timeout=5, + address_type=BLEAddressType.public, + interval_min=60, + interval_max=76, + supervision_timeout=100, + latency=0, + ): """ Connect directly to a device given the ble address then discovers and stores the characteristic and characteristic descriptor handles. @@ -410,24 +440,28 @@ def connect(self, address, timeout=5, if device._address == bgapi_address_to_hex(address_bytes): return device - log.info("Connecting to device at address %s (timeout %ds)", - address, timeout) + log.info("Connecting to device at address %s (timeout %ds)", address, timeout) self.set_bondable(False) if address_type == BLEAddressType.public: - addr_type = constants.ble_address_type['gap_address_type_public'] + addr_type = constants.ble_address_type["gap_address_type_public"] else: - addr_type = constants.ble_address_type['gap_address_type_random'] + addr_type = constants.ble_address_type["gap_address_type_random"] self.send_command( CommandBuilder.gap_connect_direct( - address_bytes, addr_type, interval_min, interval_max, - supervision_timeout, latency)) + address_bytes, + addr_type, + interval_min, + interval_max, + supervision_timeout, + latency, + ) + ) try: self.expect(ResponsePacketType.gap_connect_direct) - _, packet = self.expect(EventPacketType.connection_status, - timeout=timeout) + _, packet = self.expect(EventPacketType.connection_status, timeout=timeout) # TODO what do we do if the status isn't 'connected'? Retry? # Raise an exception? Should also check the address matches the # expected TODO i'm finding that when reconnecting to the same @@ -435,17 +469,18 @@ def connect(self, address, timeout=5, # picked up here as "connected", then we don't get anything # else. if self._connection_status_flag( - packet['flags'], - constants.connection_status_flag['connected']): + packet["flags"], constants.connection_status_flag["connected"] + ): device = BGAPIBLEDevice( - bgapi_address_to_hex(packet['address']), - packet['connection_handle'], - self) + bgapi_address_to_hex(packet["address"]), + packet["connection_handle"], + self, + ) if self._connection_status_flag( - packet['flags'], - constants.connection_status_flag['encrypted']): + packet["flags"], constants.connection_status_flag["encrypted"] + ): device.encrypted = True - self._connections[packet['connection_handle']] = device + self._connections[packet["connection_handle"]] = device log.info("Connected to %s", address) return device except ExpectedResponseTimeout: @@ -463,27 +498,29 @@ def connect(self, address, timeout=5, def discover_characteristics(self, connection_handle, timeout=30): att_handle_start = 0x0001 # first valid handle att_handle_end = 0xFFFF # last valid handle - log.info("Fetching characteristics for connection %d", - connection_handle) + log.info("Fetching characteristics for connection %d", connection_handle) self.send_command( CommandBuilder.attclient_find_information( - connection_handle, att_handle_start, att_handle_end)) + connection_handle, att_handle_start, att_handle_end + ) + ) self.expect(ResponsePacketType.attclient_find_information) try: - self.expect(EventPacketType.attclient_procedure_completed, - timeout=timeout) + self.expect(EventPacketType.attclient_procedure_completed, timeout=timeout) except ExpectedResponseTimeout: log.warn("Continuing even though discovery hasn't finished") - for char_uuid_str, char_obj in ( - self._characteristics[connection_handle].items()): - log.info("Characteristic 0x%s is handle 0x%x", - char_uuid_str, char_obj.handle) - for desc_uuid_str, desc_handle in ( - char_obj.descriptors.items()): - log.info("Characteristic descriptor 0x%s is handle 0x%x", - desc_uuid_str, desc_handle) + for char_uuid_str, char_obj in self._characteristics[connection_handle].items(): + log.info( + "Characteristic 0x%s is handle 0x%x", char_uuid_str, char_obj.handle + ) + for desc_uuid_str, desc_handle in char_obj.descriptors.items(): + log.info( + "Characteristic descriptor 0x%s is handle 0x%x", + desc_uuid_str, + desc_handle, + ) return self._characteristics[connection_handle] @staticmethod @@ -519,7 +556,7 @@ def _get_uuid_type(uuid): if uuid in constants.gatt_characteristic_type_uuid.values(): return UUIDType.characteristic - log.warn("Unrecognized 4 byte UUID %s", hexlify(uuid)) + log.warning("Unrecognized 4 byte UUID %s", hexlify(uuid)) return UUIDType.nonstandard def _scan_rsp_data(self, data): @@ -552,33 +589,35 @@ def _scan_rsp_data(self, data): bytes_left_in_field -= 1 if bytes_left_in_field == 0: # End of field - field_name = ( - constants.scan_response_data_type[field_value[0]]) + field_name = constants.scan_response_data_type[field_value[0]] field_value = field_value[1:] # Field type specific formats - if (field_name == 'complete_local_name' or - field_name == 'shortened_local_name'): + if ( + field_name == "complete_local_name" + or field_name == "shortened_local_name" + ): try: dev_name = bytearray(field_value).decode("utf-8") - except: - log.debug("Failed to decode data as UTF8, " - "returning verbatim") + except Exception: + log.debug( + "Failed to decode data as UTF8, " "returning verbatim" + ) else: data_dict[field_name] = dev_name - elif (field_name == - 'complete_list_128-bit_service_class_uuids'): + elif field_name == "complete_list_128-bit_service_class_uuids": if len(field_value) % 16 == 0: # 16 bytes data_dict[field_name] = [] for i in range(0, int(len(field_value) / 16)): - service_uuid = ( - "0x%s" % - bgapi_address_to_hex( - field_value[i * 16:i * 16 + 16])) + service_uuid = "0x%s" % bgapi_address_to_hex( + field_value[i * 16 : i * 16 + 16] + ) data_dict[field_name].append(service_uuid) else: - log.warning("Expected a service class UUID of 16\ + log.warning( + "Expected a service class UUID of 16\ bytes. Instead received %d bytes", - len(field_value)) + len(field_value), + ) else: data_dict[field_name] = bytearray(field_value) return dev_name, data_dict @@ -586,8 +625,9 @@ def _scan_rsp_data(self, data): def expect(self, expected, *args, **kargs): return self.expect_any([expected], *args, **kargs) - def expect_any(self, expected_packet_choices, timeout=None, - assert_return_success=True): + def expect_any( + self, expected_packet_choices, timeout=None, assert_return_success=True + ): """ Process packets until a packet of one of the expected types is found. @@ -602,8 +642,11 @@ def expect_any(self, expected_packet_choices, timeout=None, not receiving withint the time limit. """ timeout = timeout or 1 - log.debug("Expecting a response of one of %s within %fs", - expected_packet_choices, timeout or 0) + log.debug( + "Expecting a response of one of %s within %fs", + expected_packet_choices, + timeout or 0, + ) start_time = None if timeout is not None: @@ -612,13 +655,11 @@ def expect_any(self, expected_packet_choices, timeout=None, while True: packet = None try: - packet = self._receiver_queue.get( - timeout=self._receive_queue_timeout) + packet = self._receiver_queue.get(timeout=self._receive_queue_timeout) except queue.Empty: if timeout is not None: if _timed_out(start_time, timeout): - exc = ExpectedResponseTimeout( - expected_packet_choices, timeout) + exc = ExpectedResponseTimeout(expected_packet_choices, timeout) exc.__cause__ = None raise exc continue @@ -632,9 +673,10 @@ def expect_any(self, expected_packet_choices, timeout=None, log.warn("Ignoring message decode failure", exc_info=True) continue - return_code = response.get('result', 0) - log.debug("Received a %s packet: %s", - packet_type, get_return_message(return_code)) + return_code = response.get("result", 0) + log.debug( + "Received a %s packet: %s", packet_type, get_return_message(return_code) + ) if packet_type in self._packet_handlers: self._packet_handlers[packet_type](response) @@ -657,9 +699,10 @@ def _receive(self): log.warn("Ignoring message decode failure", exc_info=True) continue if packet_type == EventPacketType.attclient_attribute_value: - device = self._connections[args['connection_handle']] - device.receive_notification(args['atthandle'], - bytearray(args['value'])) + device = self._connections[args["connection_handle"]] + device.receive_notification( + args["atthandle"], bytearray(args["value"]) + ) self._receiver_queue.put(packet) log.info("Stopping receiver") @@ -670,9 +713,9 @@ def _ble_evt_attclient_attribute_value(self, args): args -- dictionary containing the attribute handle ('atthandle'), attribute type ('type'), and attribute value ('value') """ - log.debug("attribute handle = %x", args['atthandle']) - log.debug("attribute type = %x", args['type']) - log.debug("attribute value = 0x%s", hexlify(bytearray(args['value']))) + log.debug("attribute handle = %x", args["atthandle"]) + log.debug("attribute type = %x", args["type"]) + log.debug("attribute value = 0x%s", hexlify(bytearray(args["value"]))) def _ble_evt_attclient_find_information_found(self, args): """ @@ -691,40 +734,44 @@ def _ble_evt_attclient_find_information_found(self, args): args -- dictionary containing the characteristic handle ('chrhandle'), and characteristic UUID ('uuid') """ - raw_uuid = bytearray(reversed(args['uuid'])) + raw_uuid = bytearray(reversed(args["uuid"])) # Convert 4-byte UUID shorthand to a full, 16-byte UUID uuid_type = self._get_uuid_type(raw_uuid) if uuid_type != UUIDType.custom: - uuid = uuid16_to_uuid(int( - bgapi_address_to_hex(args['uuid']).replace(':', ''), 16)) + uuid = uuid16_to_uuid( + int(bgapi_address_to_hex(args["uuid"]).replace(":", ""), 16) + ) else: uuid = UUID(bytes=bytes(raw_uuid)) # TODO is there a way to get the characteristic from the packet instead # of having to track the "current" characteristic? - if (uuid_type == UUIDType.descriptor and - self._current_characteristic is not None): - self._current_characteristic.add_descriptor(uuid, args['chrhandle']) - elif (uuid_type == UUIDType.custom or - uuid_type == UUIDType.nonstandard or - uuid_type == UUIDType.characteristic): + if ( + uuid_type == UUIDType.descriptor + and self._current_characteristic is not None + ): + self._current_characteristic.add_descriptor(uuid, args["chrhandle"]) + elif ( + uuid_type == UUIDType.custom + or uuid_type == UUIDType.nonstandard + or uuid_type == UUIDType.characteristic + ): if uuid_type == UUIDType.custom: log.info("Found custom characteristic %s" % uuid) elif uuid_type == UUIDType.characteristic: log.info("Found approved characteristic %s" % uuid) elif uuid_type == UUIDType.nonstandard: log.info("Found nonstandard 4-byte characteristic %s" % uuid) - new_char = Characteristic(uuid, args['chrhandle']) + new_char = Characteristic(uuid, args["chrhandle"]) self._current_characteristic = new_char - self._characteristics[ - args['connection_handle']][uuid] = new_char + self._characteristics[args["connection_handle"]][uuid] = new_char def _ble_evt_connection_disconnected(self, args): """ Handles the event for the termination of a connection. """ - self._connections.pop(args['connection_handle'], None) + self._connections.pop(args["connection_handle"], None) def _ble_evt_connection_status(self, args): """ @@ -736,23 +783,25 @@ def _ble_evt_connection_status(self, args): (timeout'), device latency ('latency'), device bond handle ('bonding') """ - connection_handle = args['connection_handle'] + connection_handle = args["connection_handle"] if not self._connection_status_flag( - args['flags'], - constants.connection_status_flag['connected']): + args["flags"], constants.connection_status_flag["connected"] + ): # Disconnected self._connections.pop(connection_handle, None) - log.info("Connection status: handle=0x%x, flags=%s, address=0x%s, " - "connection interval=%fms, timeout=%d, " - "latency=%d intervals, bonding=0x%x", - connection_handle, - args['flags'], - hexlify(bytearray(args['address'])), - args['conn_interval'] * 1.25, - args['timeout'] * 10, - args['latency'], - args['bonding']) + log.info( + "Connection status: handle=0x%x, flags=%s, address=0x%s, " + "connection interval=%fms, timeout=%d, " + "latency=%d intervals, bonding=0x%x", + connection_handle, + args["flags"], + hexlify(bytearray(args["address"])), + args["conn_interval"] * 1.25, + args["timeout"] * 10, + args["latency"], + args["bonding"], + ) def _ble_evt_gap_scan_response(self, args): """ @@ -766,9 +815,9 @@ def _ble_evt_gap_scan_response(self, args): scan resonse data list ('data') """ # Parse packet - packet_type = constants.scan_response_packet_type[args['packet_type']] - address = bgapi_address_to_hex(args['sender']) - name, data_dict = self._scan_rsp_data(args['data']) + packet_type = constants.scan_response_packet_type[args["packet_type"]] + address = bgapi_address_to_hex(args["sender"]) + name, data_dict = self._scan_rsp_data(args["data"]) # Store device information if address not in self._devices_discovered: @@ -778,12 +827,17 @@ def _ble_evt_gap_scan_response(self, args): dev.name = name if dev.address == "": dev.address = address - if (packet_type not in dev.packet_data or - len(dev.packet_data[packet_type]) < len(data_dict)): + if packet_type not in dev.packet_data or len( + dev.packet_data[packet_type] + ) < len(data_dict): dev.packet_data[packet_type] = data_dict - dev.rssi = args['rssi'] - log.debug("Received a scan response from %s with rssi=%d dBM " - "and data=%s", address, args['rssi'], data_dict) + dev.rssi = args["rssi"] + log.debug( + "Received a scan response from %s with rssi=%d dBM " "and data=%s", + address, + args["rssi"], + data_dict, + ) if self._scan_cb is not None: if self._scan_cb(self._devices_discovered, address, packet_type): @@ -800,7 +854,7 @@ def _ble_evt_sm_bond_status(self, args): middle used ('mitm'), keys stored for bonding ('keys') """ # Add to list of stored bonds found or set flag - self._stored_bonds.append(args['bond']) + self._stored_bonds.append(args["bond"]) def _ble_rsp_sm_delete_bonding(self, args): """ @@ -808,7 +862,7 @@ def _ble_rsp_sm_delete_bonding(self, args): args -- dictionary containing the return code ('result') """ - result = args['result'] + result = args["result"] if result == 0: self._stored_bonds.pop() return result @@ -820,8 +874,8 @@ def _ble_rsp_sm_get_bonds(self, args): args -- dictionary containing the number of stored bonds ('bonds'), """ - self._num_bonds = args['bonds'] - log.debug("num bonds = %d", args['bonds']) + self._num_bonds = args["bonds"] + log.debug("num bonds = %d", args["bonds"]) def _ble_rsp_system_address_get(self, args): """ @@ -830,5 +884,5 @@ def _ble_rsp_system_address_get(self, args): args -- dictionary containing the mac address ('address'), """ - self.address = args['address'] - log.debug("Adapter address = {0}".format(args['address'])) + self.address = args["address"] + log.debug("Adapter address = {0}".format(args["address"])) diff --git a/pygatt/backends/bgapi/bglib.py b/pygatt/backends/bgapi/bglib.py index ddec9be6..97259f27 100644 --- a/pygatt/backends/bgapi/bglib.py +++ b/pygatt/backends/bgapi/bglib.py @@ -44,131 +44,137 @@ class UnknownMessageType(Exception): pass -ResponsePacketType = Enum('ResponsePacketType', [ - 'system_reset', - 'system_hello', - 'system_address_get', - 'system_reg_write', - 'system_reg_read', - 'system_get_counters', - 'system_get_connections', - 'system_read_memory', - 'system_get_info', - 'system_endpoint_tx', - 'system_whitelist_append', - 'system_whitelist_remove', - 'system_whitelist_clear', - 'system_endpoint_rx', - 'system_endpoint_set_watermarks', - 'flash_ps_defrag', - 'flash_ps_dump', - 'flash_ps_erase_all', - 'flash_ps_save', - 'flash_ps_load', - 'flash_ps_erase', - 'flash_erase_page', - 'flash_write_words', - 'attributes_write', - 'attributes_read', - 'attributes_read_type', - 'attributes_user_read_response', - 'attributes_user_write_response', - 'connection_disconnect', - 'connection_get_rssi', - 'connection_update', - 'connection_version_update', - 'connection_channel_map_get', - 'connection_channel_map_set', - 'connection_features_get', - 'connection_get_status', - 'connection_raw_tx', - 'attclient_find_by_type_value', - 'attclient_read_by_group_type', - 'attclient_read_by_type', - 'attclient_find_information', - 'attclient_read_by_handle', - 'attclient_attribute_write', - 'attclient_write_command', - 'attclient_indicate_confirm', - 'attclient_read_long', - 'attclient_prepare_write', - 'attclient_execute_write', - 'attclient_read_multiple', - 'sm_encrypt_start', - 'sm_set_bondable_mode', - 'sm_delete_bonding', - 'sm_set_parameters', - 'sm_passkey_entry', - 'sm_get_bonds', - 'sm_set_oob_data', - 'gap_set_privacy_flags', - 'gap_set_mode', - 'gap_discover', - 'gap_connect_direct', - 'gap_end_procedure', - 'gap_connect_selective', - 'gap_set_filtering', - 'gap_set_scan_parameters', - 'gap_set_adv_parameters', - 'gap_set_adv_data', - 'gap_set_directed_connectable_mode', - 'hardware_io_port_config_irq', - 'hardware_set_soft_timer', - 'hardware_adc_read', - 'hardware_io_port_config_direction', - 'hardware_io_port_config_function', - 'hardware_io_port_config_pull', - 'hardware_io_port_write', - 'hardware_io_port_read', - 'hardware_spi_config', - 'hardware_spi_transfer', - 'hardware_i2c_read', - 'hardware_i2c_write', - 'hardware_set_txpower', - 'hardware_timer_comparator', - 'test_phy_tx', - 'test_phy_rx', - 'test_phy_end', - 'test_phy_reset', - 'test_get_channel_map', - 'test_debug', -]) - - -EventPacketType = Enum('EventPacketType', [ - 'system_boot', - 'system_debug', - 'system_endpoint_watermark_rx', - 'system_endpoint_watermark_tx', - 'system_script_failure', - 'system_no_license_key', - 'flash_ps_key', - 'attributes_value', - 'attributes_user_read_request', - 'attributes_status', - 'connection_status', - 'connection_version_ind', - 'connection_feature_ind', - 'connection_raw_rx', - 'connection_disconnected', - 'attclient_indicated', - 'attclient_procedure_completed', - 'attclient_group_found', - 'attclient_attribute_found', - 'attclient_find_information_found', - 'attclient_attribute_value', - 'attclient_read_multiple_response', - 'sm_smp_data', - 'sm_bonding_fail', - 'sm_passkey_display', - 'sm_passkey_request', - 'sm_bond_status', - 'gap_scan_response', - 'gap_mode_changed', - 'hardware_io_port_status', - 'hardware_soft_timer', - 'hardware_adc_result', -]) +ResponsePacketType = Enum( + "ResponsePacketType", + [ + "system_reset", + "system_hello", + "system_address_get", + "system_reg_write", + "system_reg_read", + "system_get_counters", + "system_get_connections", + "system_read_memory", + "system_get_info", + "system_endpoint_tx", + "system_whitelist_append", + "system_whitelist_remove", + "system_whitelist_clear", + "system_endpoint_rx", + "system_endpoint_set_watermarks", + "flash_ps_defrag", + "flash_ps_dump", + "flash_ps_erase_all", + "flash_ps_save", + "flash_ps_load", + "flash_ps_erase", + "flash_erase_page", + "flash_write_words", + "attributes_write", + "attributes_read", + "attributes_read_type", + "attributes_user_read_response", + "attributes_user_write_response", + "connection_disconnect", + "connection_get_rssi", + "connection_update", + "connection_version_update", + "connection_channel_map_get", + "connection_channel_map_set", + "connection_features_get", + "connection_get_status", + "connection_raw_tx", + "attclient_find_by_type_value", + "attclient_read_by_group_type", + "attclient_read_by_type", + "attclient_find_information", + "attclient_read_by_handle", + "attclient_attribute_write", + "attclient_write_command", + "attclient_indicate_confirm", + "attclient_read_long", + "attclient_prepare_write", + "attclient_execute_write", + "attclient_read_multiple", + "sm_encrypt_start", + "sm_set_bondable_mode", + "sm_delete_bonding", + "sm_set_parameters", + "sm_passkey_entry", + "sm_get_bonds", + "sm_set_oob_data", + "gap_set_privacy_flags", + "gap_set_mode", + "gap_discover", + "gap_connect_direct", + "gap_end_procedure", + "gap_connect_selective", + "gap_set_filtering", + "gap_set_scan_parameters", + "gap_set_adv_parameters", + "gap_set_adv_data", + "gap_set_directed_connectable_mode", + "hardware_io_port_config_irq", + "hardware_set_soft_timer", + "hardware_adc_read", + "hardware_io_port_config_direction", + "hardware_io_port_config_function", + "hardware_io_port_config_pull", + "hardware_io_port_write", + "hardware_io_port_read", + "hardware_spi_config", + "hardware_spi_transfer", + "hardware_i2c_read", + "hardware_i2c_write", + "hardware_set_txpower", + "hardware_timer_comparator", + "test_phy_tx", + "test_phy_rx", + "test_phy_end", + "test_phy_reset", + "test_get_channel_map", + "test_debug", + ], +) + + +EventPacketType = Enum( + "EventPacketType", + [ + "system_boot", + "system_debug", + "system_endpoint_watermark_rx", + "system_endpoint_watermark_tx", + "system_script_failure", + "system_no_license_key", + "flash_ps_key", + "attributes_value", + "attributes_user_read_request", + "attributes_status", + "connection_status", + "connection_version_ind", + "connection_feature_ind", + "connection_raw_rx", + "connection_disconnected", + "attclient_indicated", + "attclient_procedure_completed", + "attclient_group_found", + "attclient_attribute_found", + "attclient_find_information_found", + "attclient_attribute_value", + "attclient_read_multiple_response", + "sm_smp_data", + "sm_bonding_fail", + "sm_passkey_display", + "sm_passkey_request", + "sm_bond_status", + "gap_scan_response", + "gap_mode_changed", + "hardware_io_port_status", + "hardware_soft_timer", + "hardware_adc_result", + ], +) # Map a tuple of (class, command) to an enum identifier for the packet RESPONSE_PACKET_MAPPING = { @@ -187,7 +193,6 @@ class UnknownMessageType(Exception): (0, 12): ResponsePacketType.system_whitelist_clear, (0, 13): ResponsePacketType.system_endpoint_rx, (0, 14): ResponsePacketType.system_endpoint_set_watermarks, - (1, 0): ResponsePacketType.flash_ps_defrag, (1, 1): ResponsePacketType.flash_ps_dump, (1, 2): ResponsePacketType.flash_ps_erase_all, @@ -196,13 +201,11 @@ class UnknownMessageType(Exception): (1, 5): ResponsePacketType.flash_ps_erase, (1, 6): ResponsePacketType.flash_erase_page, (1, 7): ResponsePacketType.flash_write_words, - (2, 0): ResponsePacketType.attributes_write, (2, 1): ResponsePacketType.attributes_read, (2, 2): ResponsePacketType.attributes_read_type, (2, 3): ResponsePacketType.attributes_user_read_response, (2, 4): ResponsePacketType.attributes_user_write_response, - (3, 0): ResponsePacketType.connection_disconnect, (3, 1): ResponsePacketType.connection_get_rssi, (3, 2): ResponsePacketType.connection_update, @@ -212,7 +215,6 @@ class UnknownMessageType(Exception): (3, 6): ResponsePacketType.connection_features_get, (3, 7): ResponsePacketType.connection_get_status, (3, 8): ResponsePacketType.connection_raw_tx, - (4, 0): ResponsePacketType.attclient_find_by_type_value, (4, 1): ResponsePacketType.attclient_read_by_group_type, (4, 2): ResponsePacketType.attclient_read_by_type, @@ -224,7 +226,6 @@ class UnknownMessageType(Exception): (4, 8): ResponsePacketType.attclient_read_long, (4, 9): ResponsePacketType.attclient_prepare_write, (4, 10): ResponsePacketType.attclient_execute_write, - (5, 0): ResponsePacketType.sm_encrypt_start, (5, 1): ResponsePacketType.sm_set_bondable_mode, (5, 2): ResponsePacketType.sm_delete_bonding, @@ -232,7 +233,6 @@ class UnknownMessageType(Exception): (5, 4): ResponsePacketType.sm_passkey_entry, (5, 5): ResponsePacketType.sm_get_bonds, (5, 6): ResponsePacketType.sm_set_oob_data, - (6, 0): ResponsePacketType.gap_set_privacy_flags, (6, 1): ResponsePacketType.gap_set_mode, (6, 2): ResponsePacketType.gap_discover, @@ -244,7 +244,6 @@ class UnknownMessageType(Exception): (6, 8): ResponsePacketType.gap_set_adv_parameters, (6, 9): ResponsePacketType.gap_set_adv_data, (6, 10): ResponsePacketType.gap_set_directed_connectable_mode, - (7, 0): ResponsePacketType.hardware_io_port_config_irq, (7, 1): ResponsePacketType.hardware_set_soft_timer, (7, 2): ResponsePacketType.hardware_adc_read, @@ -259,7 +258,6 @@ class UnknownMessageType(Exception): (7, 11): ResponsePacketType.hardware_i2c_write, (7, 12): ResponsePacketType.hardware_set_txpower, (7, 13): ResponsePacketType.hardware_timer_comparator, - (8, 0): ResponsePacketType.test_phy_tx, (8, 1): ResponsePacketType.test_phy_rx, (8, 2): ResponsePacketType.test_phy_reset, @@ -277,19 +275,15 @@ class UnknownMessageType(Exception): (0, 3): EventPacketType.system_endpoint_watermark_tx, (0, 4): EventPacketType.system_script_failure, (0, 5): EventPacketType.system_no_license_key, - (1, 0): EventPacketType.flash_ps_key, - (2, 0): EventPacketType.attributes_value, (2, 1): EventPacketType.attributes_user_read_request, (2, 2): EventPacketType.attributes_status, - (3, 0): EventPacketType.connection_status, (3, 1): EventPacketType.connection_version_ind, (3, 2): EventPacketType.connection_feature_ind, (3, 3): EventPacketType.connection_raw_rx, (3, 4): EventPacketType.connection_disconnected, - (4, 0): EventPacketType.attclient_indicated, (4, 1): EventPacketType.attclient_procedure_completed, (4, 2): EventPacketType.attclient_group_found, @@ -297,16 +291,13 @@ class UnknownMessageType(Exception): (4, 4): EventPacketType.attclient_find_information_found, (4, 5): EventPacketType.attclient_attribute_value, (4, 6): EventPacketType.attclient_read_multiple_response, - (5, 0): EventPacketType.sm_smp_data, (5, 1): EventPacketType.sm_bonding_fail, (5, 2): EventPacketType.sm_passkey_display, (5, 3): EventPacketType.sm_passkey_request, (5, 4): EventPacketType.sm_bond_status, - (6, 0): EventPacketType.gap_scan_response, (6, 1): EventPacketType.gap_mode_changed, - (7, 0): EventPacketType.hardware_io_port_status, (7, 1): EventPacketType.hardware_soft_timer, (7, 2): EventPacketType.hardware_adc_result, @@ -317,6 +308,7 @@ class BGLib(object): """ Modified version of jrowberg's BGLib implementation. """ + def __init__(self): self.buffer = [] self.expected_length = 0 @@ -350,138 +342,124 @@ def parse_byte(self, new_byte): # Convert from str or bytes to an integer for comparison new_byte = ord(new_byte) - if (len(self.buffer) == 0 and - new_byte in [self._ble_event, self._ble_response, - self._wifi_event, self._wifi_response]): + if len(self.buffer) == 0 and new_byte in [ + self._ble_event, + self._ble_response, + self._wifi_event, + self._wifi_response, + ]: self.buffer.append(new_byte) elif len(self.buffer) == 1: self.buffer.append(new_byte) - self.expected_length = ( - 4 + (self.buffer[0] & 0x07) + self.buffer[1]) + self.expected_length = 4 + (self.buffer[0] & 0x07) + self.buffer[1] elif len(self.buffer) > 1: self.buffer.append(new_byte) - if (self.expected_length > 0 and - len(self.buffer) == self.expected_length): + if self.expected_length > 0 and len(self.buffer) == self.expected_length: packet = self.buffer self.buffer = [] return packet return None - def _decode_response_packet(self, packet_class, packet_command, payload, - payload_length): - packet_type = RESPONSE_PACKET_MAPPING.get( - (packet_class, packet_command)) + def _decode_response_packet( + self, packet_class, packet_command, payload, payload_length + ): + packet_type = RESPONSE_PACKET_MAPPING.get((packet_class, packet_command)) if packet_type is None: raise UnknownMessageType( - "Packet class %d and command %d is not recognized" % - (packet_class, packet_command)) + "Packet class %d and command %d is not recognized" + % (packet_class, packet_command) + ) response = {} if packet_type == ResponsePacketType.system_address_get: - address = unpack('<6B', payload[:6]) - response = { - 'address': address - } + address = unpack("<6B", payload[:6]) + response = {"address": address} elif packet_type == ResponsePacketType.system_reg_read: - address, value =\ - unpack(''] - }, - 'mtu': { - 'patterns': [ - r'MTU was exchanged successfully: (\d+)' - ] - } + "connect": {"patterns": [r"Connection successful.*\[LE\]>"]}, + "mtu": {"patterns": [r"MTU was exchanged successfully: (\d+)"]}, } for event in self._event_vector.values(): @@ -114,18 +107,21 @@ def __init__(self, connection, parent_aliveness): event["callback"] = [] def run(self): - items = sorted(itertools.chain.from_iterable( - [[(pattern, event) - for pattern in event["patterns"]] - for event in self._event_vector.values()]) + items = sorted( + itertools.chain.from_iterable( + [ + [(pattern, event) for pattern in event["patterns"]] + for event in self._event_vector.values() + ] + ) ) patterns = [item[0] for item in items] events = [item[1] for item in items] - log.info('Running...') + log.info("Running...") while self._parent_aliveness.is_set(): try: - event_index = self._connection.expect(patterns, timeout=.5) + event_index = self._connection.expect(patterns, timeout=0.5) except pexpect.TIMEOUT: continue except (NotConnectedError, pexpect.EOF): @@ -196,8 +192,14 @@ class GATTToolBackend(BLEBackend): Backend to pygatt that uses BlueZ's interactive gatttool CLI prompt. """ - def __init__(self, hci_device='hci0', gatttool_logfile=None, - cli_options=None, search_window_size=None, max_read=None): + def __init__( + self, + hci_device="hci0", + gatttool_logfile=None, + cli_options=None, + search_window_size=None, + max_read=None, + ): """ Initialize. @@ -212,8 +214,10 @@ def __init__(self, hci_device='hci0', gatttool_logfile=None, """ if is_windows(): - raise BLEError("The GATTToolBackend requires BlueZ, " - "which is not available in Windows") + raise BLEError( + "The GATTToolBackend requires BlueZ, " + "which is not available in Windows" + ) self._hci_device = hci_device self._cli_options = cli_options @@ -265,39 +269,32 @@ def start(self, reset_on_start=True, initialization_timeout=3): self.reset() # Start gatttool interactive session for device - args = [ - 'gatttool', - self._cli_options, - '-i', - self._hci_device, - '-I' - ] - gatttool_cmd = ' '.join([arg for arg in args if arg]) - log.debug('gatttool_cmd=%s', gatttool_cmd) + args = ["gatttool", self._cli_options, "-i", self._hci_device, "-I"] + gatttool_cmd = " ".join([arg for arg in args if arg]) + log.debug("gatttool_cmd=%s", gatttool_cmd) if self._max_read: self._con = pexpect.spawn( - gatttool_cmd, logfile=self._gatttool_logfile, + gatttool_cmd, + logfile=self._gatttool_logfile, searchwindowsize=self._search_window_size, - maxread=self._max_read + maxread=self._max_read, ) else: self._con = pexpect.spawn( - gatttool_cmd, logfile=self._gatttool_logfile, + gatttool_cmd, + logfile=self._gatttool_logfile, searchwindowsize=self._search_window_size, ) # Wait for the interactive prompt - self._con.expect(r'\[LE\]>', timeout=initialization_timeout) + self._con.expect(r"\[LE\]>", timeout=initialization_timeout) # Start the notification receiving thread self._receiver = GATTToolReceiver(self._con, self._running) self._receiver.daemon = True self._receiver.register_callback("disconnected", self._disconnect) for event in ["notification", "indication"]: - self._receiver.register_callback( - event, - self._handle_notification_string - ) + self._receiver.register_callback(event, self._handle_notification_string) self._receiver.start() def stop(self): @@ -308,14 +305,14 @@ def stop(self): """ self.disconnect(self._connected_device) if self._running.is_set(): - log.info('Stopping') + log.info("Stopping") self._running.clear() if self._con and self._con.isalive(): while True: if not self._con.isalive(): break - self.sendline('exit') + self.sendline("exit") time.sleep(0.1) self._con.close() self._con = None @@ -332,34 +329,36 @@ def scan(self, timeout=10, run_as_root=False): terminate cleanly, and may leave your Bluetooth adapter in a bad state. """ - cmd = 'hcitool -i %s lescan' % self._hci_device + cmd = "hcitool -i %s lescan" % self._hci_device if run_as_root: - cmd = 'sudo %s' % cmd + cmd = "sudo %s" % cmd log.info("Starting BLE scan") # "lescan" doesn't exit, so we're forcing a timeout self._scan = scan = pexpect.spawn(cmd, timeout=timeout) try: - scan.expect('foooooo') + scan.expect("foooooo") except pexpect.EOF: - before_eof = scan.before.decode('utf-8', 'replace') + before_eof = scan.before.decode("utf-8", "replace") if "No such device" in before_eof: message = "No BLE adapter found" elif "Set scan parameters failed: Input/output error" in before_eof: - message = ("BLE adapter requires reset after a scan as root" - "- call adapter.reset()") + message = ( + "BLE adapter requires reset after a scan as root" + "- call adapter.reset()" + ) else: message = "Unexpected error when scanning: %s" % before_eof log.error(message) raise BLEError(message) except pexpect.TIMEOUT: devices = {} - for line in scan.before.decode('utf-8', 'replace').split('\r\n'): - if 'sudo' in line: - raise BLEError("Enable passwordless sudo for 'hcitool' " - "before scanning") - match = re.match( - r'(([0-9A-Fa-f][0-9A-Fa-f]:?){6}) (\(?.+\)?)', line) + for line in scan.before.decode("utf-8", "replace").split("\r\n"): + if "sudo" in line: + raise BLEError( + "Enable passwordless sudo for 'hcitool' " "before scanning" + ) + match = re.match(r"(([0-9A-Fa-f][0-9A-Fa-f]:?){6}) (\(?.+\)?)", line) if match is not None: address = match.group(1) @@ -368,17 +367,12 @@ def scan(self, timeout=10, run_as_root=False): name = None if address in devices: - if (devices[address]['name'] is None) and (name is not - None): - log.info("Discovered name of %s as %s", - address, name) - devices[address]['name'] = name + if (devices[address]["name"] is None) and (name is not None): + log.info("Discovered name of %s as %s", address, name) + devices[address]["name"] = name else: log.info("Discovered %s (%s)", address, name) - devices[address] = { - 'address': address, - 'name': name - } + devices[address] = {"address": address, "name": name} log.info("Found %d BLE devices", len(devices)) return [device for device in devices.values()] finally: @@ -412,18 +406,25 @@ def kill(self): if self._scan.isalive(): self._scan.wait() except OSError: - log.error("Unable to gracefully stop the scan - " - "BLE adapter may need to be reset.") + log.error( + "Unable to gracefully stop the scan - " + "BLE adapter may need to be reset." + ) - def connect(self, address, timeout=DEFAULT_CONNECT_TIMEOUT_S, - address_type=BLEAddressType.public, auto_reconnect=False): - log.info('Connecting to %s with timeout=%s', address, timeout) - self.sendline('sec-level low') + def connect( + self, + address, + timeout=DEFAULT_CONNECT_TIMEOUT_S, + address_type=BLEAddressType.public, + auto_reconnect=False, + ): + log.info("Connecting to %s with timeout=%s", address, timeout) + self.sendline("sec-level low") self._address = address self._auto_reconnect = auto_reconnect try: - cmd = 'connect {0} {1}'.format(self._address, address_type.name) + cmd = "connect {0} {1}".format(self._address, address_type.name) with self._receiver.event("connect", timeout): self.sendline(cmd) except NotificationTimeout: @@ -437,34 +438,29 @@ def connect(self, address, timeout=DEFAULT_CONNECT_TIMEOUT_S, return self._connected_device def clear_bond(self, address=None): - """Use the 'bluetoothctl' program to erase a stored BLE bond. - """ - con = pexpect.spawn('bluetoothctl') + """Use the 'bluetoothctl' program to erase a stored BLE bond.""" + con = pexpect.spawn("bluetoothctl") try: con.expect("bluetooth", timeout=1) log.info("Clearing bond for %s", address) con.sendline("remove " + address.upper()) - con.expect( - ["Device has been removed", "# "], - timeout=.5 - ) + con.expect(["Device has been removed", "# "], timeout=0.5) except pexpect.TIMEOUT: - log.error("Unable to remove bonds for %s: %s", - address, con.before) + log.error("Unable to remove bonds for %s: %s", address, con.before) finally: con.close(True) log.info("Removed bonds for %s", address) def _disconnect(self, event): if self._connected_device is not None and self._auto_reconnect: - # this is called as a callback from the pexpect thread # the reconnection process has to be started in parallel, otherwise # the call is never finished log.info("Connection to %s lost. Reconnecting...", self._address) - reconnect_thread = threading.Thread(target=self.reconnect, - args=(self._connected_device, )) + reconnect_thread = threading.Thread( + target=self.reconnect, args=(self._connected_device,) + ) reconnect_thread.start() else: try: @@ -475,8 +471,7 @@ def _disconnect(self, event): @at_most_one_device def reconnect(self, timeout=DEFAULT_CONNECT_TIMEOUT_S): while self._auto_reconnect: - log.info("Connecting to %s with timeout=%s", self._address, - timeout) + log.info("Connecting to %s with timeout=%s", self._address, timeout) try: cmd = "connect" with self._receiver.event("connect", timeout): @@ -486,10 +481,12 @@ def reconnect(self, timeout=DEFAULT_CONNECT_TIMEOUT_S): log.info("Connection to %s reestablished.") break # finished reconnecting except NotificationTimeout: - message = ("Timed out connecting to {0} after {1} seconds. " - "Retrying in {2} seconds".format( - self._address, timeout, - DEFAULT_RECONNECT_DELAY)) + message = ( + "Timed out connecting to {0} after {1} seconds. " + "Retrying in {2} seconds".format( + self._address, timeout, DEFAULT_RECONNECT_DELAY + ) + ) log.info(message) time.sleep(DEFAULT_RECONNECT_DELAY) @@ -497,28 +494,26 @@ def reconnect(self, timeout=DEFAULT_CONNECT_TIMEOUT_S): def disconnect(self, *args, **kwargs): self._auto_reconnect = False # disables any running reconnection if not self._receiver.is_set("disconnected"): - self.sendline('disconnect') + self.sendline("disconnect") self._connected_device = None # TODO maybe call a disconnected callback on the device instance, so the # device knows if it was asynchronously disconnected? @at_most_one_device def bond(self, *args, **kwargs): - log.info('Bonding') - self.sendline('sec-level medium') + log.info("Bonding") + self.sendline("sec-level medium") def _save_charecteristic_callback(self, event): match = event["match"] try: value_handle = int(match.group(2), 16) - char_uuid = match.group(3).strip().decode('ascii') + char_uuid = match.group(3).strip().decode("ascii") self._characteristics[UUID(char_uuid)] = Characteristic( char_uuid, value_handle ) log.debug( - "Found characteristic %s, value handle: 0x%x", - char_uuid, - value_handle + "Found characteristic %s, value handle: 0x%x", char_uuid, value_handle ) except AttributeError: pass @@ -530,11 +525,11 @@ def discover_characteristics(self, timeout=5): "discover", self._save_charecteristic_callback, ) - self.sendline('characteristics') + self.sendline("characteristics") max_time = time.time() + timeout while not self._characteristics and time.time() < max_time: - time.sleep(.5) + time.sleep(0.5) # Sleep one extra second in case we caught characteristic # in the middle @@ -548,13 +543,14 @@ def discover_characteristics(self, timeout=5): def _handle_notification_string(self, event): msg = event["after"] if not msg: - log.warn("Blank message received in notification, ignored") + log.warning("Blank message received in notification, ignored") return - match_obj = re.match(r'.* handle = (0x[0-9a-f]+) value:(.*)', - msg.decode('utf-8')) + match_obj = re.match( + r".* handle = (0x[0-9a-f]+) value:(.*)", msg.decode("utf-8") + ) if match_obj is None: - log.warn("Unable to parse notification string, ignoring: %s", msg) + log.warning("Unable to parse notification string, ignoring: %s", msg) return handle = int(match_obj.group(1), 16) @@ -563,8 +559,7 @@ def _handle_notification_string(self, event): self._connected_device.receive_notification(handle, values) @at_most_one_device - def char_write_handle(self, handle, value, wait_for_response=True, - timeout=30): + def char_write_handle(self, handle, value, wait_for_response=True, timeout=30): """ Writes a value to a given characteristic handle. @@ -574,13 +569,13 @@ def char_write_handle(self, handle, value, wait_for_response=True, false, sends a command and expects no acknowledgement from the device. """ - cmd = 'char-write-{0} 0x{1:02x} {2}'.format( - 'req' if wait_for_response else 'cmd', + cmd = "char-write-{0} 0x{1:02x} {2}".format( + "req" if wait_for_response else "cmd", handle, - ''.join("{0:02x}".format(byte) for byte in value), + "".join("{0:02x}".format(byte) for byte in value), ) - log.debug('Sending cmd=%s', cmd) + log.debug("Sending cmd=%s", cmd) if wait_for_response: try: with self._receiver.event("char_written", timeout=timeout): @@ -591,7 +586,7 @@ def char_write_handle(self, handle, value, wait_for_response=True, else: self.sendline(cmd) - log.info('Sent cmd=%s', cmd) + log.info("Sent cmd=%s", cmd) @at_most_one_device def char_read(self, uuid, timeout=1): @@ -603,7 +598,7 @@ def char_read(self, uuid, timeout=1): :rtype: bytearray """ with self._receiver.event("value", timeout=timeout): - self.sendline('char-read-uuid %s' % uuid) + self.sendline("char-read-uuid %s" % uuid) rval = self._receiver.last_value("value", "after").split()[1:] return bytearray([int(x, 16) for x in rval]) @@ -617,18 +612,17 @@ def char_read_handle(self, handle, timeout=4): :rtype: bytearray """ with self._receiver.event("value/descriptor", timeout=timeout): - self.sendline('char-read-hnd 0x{0:02x}'.format(handle)) - rval = self._receiver.last_value("value/descriptor", "after" - ).split()[1:] + self.sendline("char-read-hnd 0x{0:02x}".format(handle)) + rval = self._receiver.last_value("value/descriptor", "after").split()[1:] return bytearray([int(x, 16) for x in rval]) @at_most_one_device def exchange_mtu(self, mtu, timeout=1): - cmd = 'mtu {}'.format(mtu) + cmd = "mtu {}".format(mtu) - log.debug('Requesting MTU: {}'.format(mtu)) + log.debug("Requesting MTU: {}".format(mtu)) - with self._receiver.event('mtu', timeout=timeout): + with self._receiver.event("mtu", timeout=timeout): self.sendline(cmd) try: rval = self._receiver.last_value("mtu", "after").split()[-1] @@ -636,11 +630,10 @@ def exchange_mtu(self, mtu, timeout=1): log.error('MTU exchange failed: "{}"'.format(rval)) raise - log.debug('MTU exhange successful: {}'.format(rval)) + log.debug("MTU exhange successful: {}".format(rval)) return rval def reset(self): subprocess.Popen(["sudo", "systemctl", "restart", "bluetooth"]).wait() - subprocess.Popen([ - "sudo", "hciconfig", self._hci_device, "reset"]).wait() + subprocess.Popen(["sudo", "hciconfig", self._hci_device, "reset"]).wait() diff --git a/pygatt/device.py b/pygatt/device.py index 3b3615bd..bf81d843 100644 --- a/pygatt/device.py +++ b/pygatt/device.py @@ -22,6 +22,7 @@ class BLEDevice(object): implementations. This class is not meant to be instantiated directly - use BLEBackend.connect() to create one. """ + def __init__(self, address): """ Initialize. @@ -118,8 +119,9 @@ def char_write(self, uuid, value, wait_for_response=True): my_ble_device.char_write('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b', bytearray([0x00, 0xFF])) """ - return self.char_write_handle(self.get_handle(uuid), value, - wait_for_response=wait_for_response) + return self.char_write_handle( + self.get_handle(uuid), value, wait_for_response=wait_for_response + ) def char_write_handle(self, handle, value, wait_for_response=True): """ @@ -147,8 +149,9 @@ def char_write_long(self, uuid, value, wait_for_response=False): my_ble_device.char_write('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b', bytearray([0x00, 0xFF])) """ - return self.char_write_long_handle(self.get_handle(uuid), value, - wait_for_response=wait_for_response) + return self.char_write_long_handle( + self.get_handle(uuid), value, wait_for_response=wait_for_response + ) def char_write_long_handle(self, handle, value, wait_for_response=False): """ @@ -186,9 +189,7 @@ def _notification_handles(self, uuid): return value_handle, characteristic_config_handle - def subscribe(self, uuid, callback=None, indication=False, - wait_for_response=True): - + def subscribe(self, uuid, callback=None, indication=False, wait_for_response=True): """ Enable notifications or indications for a characteristic and register a callback function to be called whenever a new value arrives. @@ -202,14 +203,9 @@ def subscribe(self, uuid, callback=None, indication=False, """ - value_handle, characteristic_config_handle = ( - self._notification_handles(uuid) - ) + value_handle, characteristic_config_handle = self._notification_handles(uuid) - properties = bytearray([ - 0x2 if indication else 0x1, - 0x0 - ]) + properties = bytearray([0x2 if indication else 0x1, 0x0]) with self._lock: if callback is not None: @@ -219,7 +215,7 @@ def subscribe(self, uuid, callback=None, indication=False, self.char_write_handle( characteristic_config_handle, properties, - wait_for_response=wait_for_response + wait_for_response=wait_for_response, ) log.info("Subscribed to uuid=%s", uuid) self._subscribed_handlers[value_handle] = properties @@ -231,31 +227,29 @@ def unsubscribe(self, uuid, wait_for_response=True): """ Disable notification for a characteristic and de-register the callback. """ - value_handle, characteristic_config_handle = ( - self._notification_handles(uuid) - ) + value_handle, characteristic_config_handle = self._notification_handles(uuid) properties = bytearray([0x0, 0x0]) with self._lock: if uuid in self._subscribed_uuids: - del(self._subscribed_uuids[uuid]) + del self._subscribed_uuids[uuid] if value_handle in self._callbacks: - del(self._callbacks[value_handle]) + del self._callbacks[value_handle] if value_handle in self._subscribed_handlers: - del(self._subscribed_handlers[value_handle]) + del self._subscribed_handlers[value_handle] self.char_write_handle( characteristic_config_handle, properties, - wait_for_response=wait_for_response + wait_for_response=wait_for_response, ) log.info("Unsubscribed from uuid=%s", uuid) else: log.debug("Already unsubscribed from uuid=%s", uuid) - def subscribe_handle(self, handle, callback=None, indication=False, - wait_for_response=True): - + def subscribe_handle( + self, handle, callback=None, indication=False, wait_for_response=True + ): """ Like subscribe() but using handle instead of uuid. @@ -264,10 +258,7 @@ def subscribe_handle(self, handle, callback=None, indication=False, value_handle = handle characteristic_config_handle = value_handle + 1 - properties = bytearray([ - 0x2 if indication else 0x1, - 0x0 - ]) + properties = bytearray([0x2 if indication else 0x1, 0x0]) with self._lock: if callback is not None: @@ -277,7 +268,7 @@ def subscribe_handle(self, handle, callback=None, indication=False, self.char_write_handle( characteristic_config_handle, properties, - wait_for_response=wait_for_response + wait_for_response=wait_for_response, ) log.info("Subscribed to handle=0x%04x", value_handle) self._subscribed_handlers[value_handle] = properties @@ -297,20 +288,17 @@ def unsubscribe_handle(self, handle, wait_for_response=True): with self._lock: if value_handle in self._callbacks: - del(self._callbacks[value_handle]) + del self._callbacks[value_handle] if value_handle in self._subscribed_handlers: - del(self._subscribed_handlers[value_handle]) + del self._subscribed_handlers[value_handle] self.char_write_handle( characteristic_config_handle, properties, - wait_for_response=wait_for_response + wait_for_response=wait_for_response, ) log.info("Unsubscribed from handle=0x%04x", value_handle) else: - log.debug( - "Already unsubscribed from handle=0x%04x", - value_handle - ) + log.debug("Already unsubscribed from handle=0x%04x", value_handle) def get_handle(self, char_uuid): """ @@ -342,8 +330,9 @@ def receive_notification(self, handle, value): to all registered callbacks. """ - log.info('Received notification on handle=0x%x, value=0x%s', - handle, hexlify(value)) + log.info( + "Received notification on handle=0x%x, value=0x%s", handle, hexlify(value) + ) with self._lock: if handle in self._callbacks: for callback in self._callbacks[handle]: @@ -365,19 +354,14 @@ def resubscribe_all(self): """ for uuid in self._subscribed_uuids: - value_handle, characteristic_config_handle = ( - self._notification_handles(uuid) + value_handle, characteristic_config_handle = self._notification_handles( + uuid ) - properties = bytearray([ - 0x2 if self._subscribed_uuids[uuid] else 0x1, - 0x0 - ]) + properties = bytearray([0x2 if self._subscribed_uuids[uuid] else 0x1, 0x0]) with self._lock: self.char_write_handle( - characteristic_config_handle, - properties, - wait_for_response=True + characteristic_config_handle, properties, wait_for_response=True ) log.info("Resubscribed to uuid=%s", uuid) diff --git a/pygatt/exceptions.py b/pygatt/exceptions.py index ccd44339..0a121bb5 100644 --- a/pygatt/exceptions.py +++ b/pygatt/exceptions.py @@ -5,6 +5,7 @@ class BLEError(Exception): """Exception class for pygatt.""" + pass diff --git a/requirements.txt b/requirements.txt index 42182d0e..e12b7bdb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,9 +6,9 @@ # License:: Apache License, Version 2.0 -flake8==2.1.0 -pylint==1.2.1 -coverage==5.5 -nose==1.3.7 +flake8==7.1.0 +pylint==3.2.6 +pytest==8.3.2 +coverage==7.6.0 mock==3.0.5 -funcsigs==1.0.2 +#funcsigs==1.0.2 diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 6e2d07f9..00000000 --- a/setup.cfg +++ /dev/null @@ -1,4 +0,0 @@ -[nosetests] -cover-tests = 0 -cover-package = pygatt -verbosity = 2 diff --git a/setup.py b/setup.py index f0c08790..64d0c6b2 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ long_description=readme + "\n\n" + changelog, url="https://github.com/peplin/pygatt", install_requires=["pyserial", "enum-compat"], - setup_requires=["coverage == 5.5", "nose == 1.3.7"], + setup_requires=["coverage == 5.5", "pytest == 8.3.2"], extras_require={ "GATTTOOL": ["pexpect"], }, @@ -37,7 +37,6 @@ "License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: MIT License", "Programming Language :: Python", - "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", ), ) diff --git a/tests/bgapi/mocker.py b/tests/bgapi/mocker.py index 9a2e94e4..e3e0bb8e 100644 --- a/tests/bgapi/mocker.py +++ b/tests/bgapi/mocker.py @@ -15,15 +15,14 @@ def uuid_to_bytearray(uuid_str): Returns a bytearray containing the UUID. """ - return unhexlify(uuid_str.replace('-', '')) + return unhexlify(uuid_str.replace("-", "")) class MockBGAPISerialDevice(object): - def __init__(self, serial_port_name='mock'): + def __init__(self, serial_port_name="mock"): self.serial_port_name = serial_port_name self.mocked_serial = SerialMock(self.serial_port_name, 0.25) - self.patcher = patch('serial.Serial', - return_value=self.mocked_serial).start() + self.patcher = patch("serial.Serial", return_value=self.mocked_serial).start() def stop(self): self.patcher.stop() @@ -31,22 +30,21 @@ def stop(self): @staticmethod def _get_connection_status_flags_byte(flags): flags_byte = 0x00 - if 'connected' in flags: + if "connected" in flags: flags_byte |= 0x01 - if 'encrypted' in flags: + if "encrypted" in flags: flags_byte |= 0x02 - if 'completed' in flags: + if "completed" in flags: flags_byte |= 0x04 - if 'parameters_change' in flags: + if "parameters_change" in flags: flags_byte |= 0x08 return flags_byte - def stage_disconnected_by_remote( - self, connection_handle=0x00): + def stage_disconnected_by_remote(self, connection_handle=0x00): # Stage ble_evt_connection_disconnected (terminated by remote user) self.mocked_serial.stage_output( - BGAPIPacketBuilder.connection_disconnected( - connection_handle, 0x0213)) + BGAPIPacketBuilder.connection_disconnected(connection_handle, 0x0213) + ) def stage_disconnect_packets(self, connected, fail, connection_handle=0x00): if connected: @@ -55,17 +53,17 @@ def stage_disconnect_packets(self, connected, fail, connection_handle=0x00): # Stage ble_rsp_connection_disconnect (success) self.mocked_serial.stage_output( - BGAPIPacketBuilder.connection_disconnect( - connection_handle, 0x0000)) + BGAPIPacketBuilder.connection_disconnect(connection_handle, 0x0000) + ) # Stage ble_evt_connection_disconnected (success by local user) self.mocked_serial.stage_output( - BGAPIPacketBuilder.connection_disconnected( - connection_handle, 0x0000)) + BGAPIPacketBuilder.connection_disconnected(connection_handle, 0x0000) + ) else: # not connected always fails # Stage ble_rsp_connection_disconnect (fail, not connected) self.mocked_serial.stage_output( - BGAPIPacketBuilder.connection_disconnect( - connection_handle, 0x0186)) + BGAPIPacketBuilder.connection_disconnect(connection_handle, 0x0186) + ) def stage_run_packets(self, connection_handle=0x00): self.mocked_serial.stage_output(BGAPIPacketBuilder.system_boot()) @@ -74,140 +72,154 @@ def stage_run_packets(self, connection_handle=0x00): # Stage ble_rsp_gap_set_mode (success) self.mocked_serial.stage_output(BGAPIPacketBuilder.gap_set_mode(0x0000)) # Stage ble_rsp_gap_end_procedure (fail, device in wrong state) - self.mocked_serial.stage_output( - BGAPIPacketBuilder.gap_end_procedure(0x0181)) + self.mocked_serial.stage_output(BGAPIPacketBuilder.gap_end_procedure(0x0181)) # Stage ble_rsp_sm_set_bondable_mode (always success) - self.mocked_serial.stage_output( - BGAPIPacketBuilder.sm_set_bondable_mode()) + self.mocked_serial.stage_output(BGAPIPacketBuilder.sm_set_bondable_mode()) def stage_connect_packets(self, addr, flags, connection_handle=0x00): - self.mocked_serial.stage_output( - BGAPIPacketBuilder.sm_set_bondable_mode()) + self.mocked_serial.stage_output(BGAPIPacketBuilder.sm_set_bondable_mode()) # Stage ble_rsp_gap_connect_direct (success) self.mocked_serial.stage_output( - BGAPIPacketBuilder.gap_connect_direct(connection_handle, 0x0000)) + BGAPIPacketBuilder.gap_connect_direct(connection_handle, 0x0000) + ) # Stage ble_evt_connection_status flags_byte = self._get_connection_status_flags_byte(flags) - self.mocked_serial.stage_output(BGAPIPacketBuilder.connection_status( - addr, flags_byte, connection_handle, 0, - 0x0014, 0x0006, 0x0000, 0xFF)) + self.mocked_serial.stage_output( + BGAPIPacketBuilder.connection_status( + addr, flags_byte, connection_handle, 0, 0x0014, 0x0006, 0x0000, 0xFF + ) + ) - def stage_get_rssi_packets(self, connection_handle=0x00, - rssi=-80): + def stage_get_rssi_packets(self, connection_handle=0x00, rssi=-80): # Stage ble_rsp_connection_get_rssi self.mocked_serial.stage_output( - BGAPIPacketBuilder.connection_get_rssi(connection_handle, rssi)) + BGAPIPacketBuilder.connection_get_rssi(connection_handle, rssi) + ) - def stage_bond_packets(self, addr, flags, - connection_handle=0x00, bond_handle=0x01): + def stage_bond_packets(self, addr, flags, connection_handle=0x00, bond_handle=0x01): # Stage ble_rsp_sm_set_bondable_mode (always success) - self.mocked_serial.stage_output( - BGAPIPacketBuilder.sm_set_bondable_mode()) + self.mocked_serial.stage_output(BGAPIPacketBuilder.sm_set_bondable_mode()) # Stage ble_rsp_sm_encrypt_start (success) - self.mocked_serial.stage_output(BGAPIPacketBuilder.sm_encrypt_start( - connection_handle, 0x0000)) + self.mocked_serial.stage_output( + BGAPIPacketBuilder.sm_encrypt_start(connection_handle, 0x0000) + ) # Stage ble_evt_sm_bond_status - self.mocked_serial.stage_output(BGAPIPacketBuilder.sm_bond_status( - bond_handle, 0x00, 0x00, 0x00)) + self.mocked_serial.stage_output( + BGAPIPacketBuilder.sm_bond_status(bond_handle, 0x00, 0x00, 0x00) + ) # Stage ble_evt_connection_status flags_byte = self._get_connection_status_flags_byte(flags) - self.mocked_serial.stage_output(BGAPIPacketBuilder.connection_status( - addr, flags_byte, connection_handle, 0, - 0x0014, 0x0006, 0x0000, 0xFF)) + self.mocked_serial.stage_output( + BGAPIPacketBuilder.connection_status( + addr, flags_byte, connection_handle, 0, 0x0014, 0x0006, 0x0000, 0xFF + ) + ) - def stage_clear_bonds_packets( - self, bonds, disconnects=False): + def stage_clear_bonds_packets(self, bonds, disconnects=False): """bonds -- list of 8-bit integer bond handles""" if disconnects: self.stage_disconnected_by_remote() # Stage ble_rsp_get_bonds - self.mocked_serial.stage_output( - BGAPIPacketBuilder.sm_get_bonds(len(bonds))) + self.mocked_serial.stage_output(BGAPIPacketBuilder.sm_get_bonds(len(bonds))) # Stage ble_evt_sm_bond_status (bond handle) for b in bonds: if disconnects: self.stage_disconnected_by_remote() - self.mocked_serial.stage_output(BGAPIPacketBuilder.sm_bond_status( - b, 0x00, 0x00, 0x00)) + self.mocked_serial.stage_output( + BGAPIPacketBuilder.sm_bond_status(b, 0x00, 0x00, 0x00) + ) # Stage ble_rsp_sm_delete_bonding (success) for b in bonds: if disconnects: self.stage_disconnected_by_remote() self.mocked_serial.stage_output( - BGAPIPacketBuilder.sm_delete_bonding(0x0000)) + BGAPIPacketBuilder.sm_delete_bonding(0x0000) + ) def stage_scan_packets(self, scan_responses=[]): # Stage ble_rsp_gap_set_scan_parameters (success) self.mocked_serial.stage_output( - BGAPIPacketBuilder.gap_set_scan_parameters(0x0000)) + BGAPIPacketBuilder.gap_set_scan_parameters(0x0000) + ) # Stage ble_rsp_gap_discover (success) - self.mocked_serial.stage_output( - BGAPIPacketBuilder.gap_discover(0x0000)) + self.mocked_serial.stage_output(BGAPIPacketBuilder.gap_discover(0x0000)) for srp in scan_responses: # Stage ble_evt_gap_scan_response self.mocked_serial.stage_output( BGAPIPacketBuilder.gap_scan_response( - srp['rssi'], srp['packet_type'], srp['bd_addr'], - srp['addr_type'], srp['bond'], - [len(srp['data'])+1]+srp['data'])) + srp["rssi"], + srp["packet_type"], + srp["bd_addr"], + srp["addr_type"], + srp["bond"], + [len(srp["data"]) + 1] + srp["data"], + ) + ) # Stage ble_rsp_gap_end_procedure (success) - self.mocked_serial.stage_output( - BGAPIPacketBuilder.gap_end_procedure(0x0000)) + self.mocked_serial.stage_output(BGAPIPacketBuilder.gap_end_procedure(0x0000)) def stage_discover_characteristics_packets( - self, uuid_handle_list, connection_handle=0x00): + self, uuid_handle_list, connection_handle=0x00 + ): # Stage ble_rsp_attclient_find_information (success) self.mocked_serial.stage_output( - BGAPIPacketBuilder.attclient_find_information( - connection_handle, 0x0000)) + BGAPIPacketBuilder.attclient_find_information(connection_handle, 0x0000) + ) for i in range(0, len(uuid_handle_list) // 2): - uuid = uuid_to_bytearray(uuid_handle_list[2*i]) - handle = uuid_handle_list[2*i + 1] + uuid = uuid_to_bytearray(uuid_handle_list[2 * i]) + handle = uuid_handle_list[2 * i + 1] # Stage ble_evt_attclient_find_information_found u = [len(uuid) + 1] self.mocked_serial.stage_output( BGAPIPacketBuilder.attclient_find_information_found( - connection_handle, handle, - (u+list(reversed(bytes(uuid)))))) + connection_handle, handle, (u + list(reversed(bytes(uuid)))) + ) + ) # Stage ble_evt_attclient_procedure_completed (success) self.mocked_serial.stage_output( BGAPIPacketBuilder.attclient_procedure_completed( - connection_handle, 0x0000, 0xFFFF)) + connection_handle, 0x0000, 0xFFFF + ) + ) def stage_char_read_packets( - self, att_handle, att_type, value, connection_handle=0x00): + self, att_handle, att_type, value, connection_handle=0x00 + ): # Stage ble_rsp_attclient_read_by_handle (success) self.mocked_serial.stage_output( - BGAPIPacketBuilder.attclient_read_by_handle( - connection_handle, 0x0000)) + BGAPIPacketBuilder.attclient_read_by_handle(connection_handle, 0x0000) + ) # Stage ble_evt_attclient_attribute_value self.mocked_serial.stage_output( BGAPIPacketBuilder.attclient_attribute_value( - connection_handle, att_handle, att_type, [len(value)+1]+value)) + connection_handle, att_handle, att_type, [len(value) + 1] + value + ) + ) - def stage_char_write_packets( - self, handle, value, connection_handle=0x00): + def stage_char_write_packets(self, handle, value, connection_handle=0x00): # Stage ble_rsp_attclient_attribute_write (success) self.mocked_serial.stage_output( - BGAPIPacketBuilder.attclient_attribute_write( - connection_handle, 0x0000)) + BGAPIPacketBuilder.attclient_attribute_write(connection_handle, 0x0000) + ) # Stage ble_evt_attclient_procedure_completed self.mocked_serial.stage_output( BGAPIPacketBuilder.attclient_procedure_completed( - connection_handle, 0x0000, handle)) + connection_handle, 0x0000, handle + ) + ) - def stage_char_write_command_packets( - self, handle, value, connection_handle=0x00): + def stage_char_write_command_packets(self, handle, value, connection_handle=0x00): # Stage ble_rsp_attclient_attribute_write (success) self.mocked_serial.stage_output( - BGAPIPacketBuilder.attclient_write_command( - connection_handle, 0x0000)) + BGAPIPacketBuilder.attclient_write_command(connection_handle, 0x0000) + ) - def stage_indication_packets( - self, handle, packet_values, connection_handle=0x00): + def stage_indication_packets(self, handle, packet_values, connection_handle=0x00): # Stage ble_evt_attclient_attribute_value for value in packet_values: val = list(value) self.mocked_serial.stage_output( BGAPIPacketBuilder.attclient_attribute_value( - connection_handle, handle, 0x00, value=[len(val)+1]+val)) + connection_handle, handle, 0x00, value=[len(val) + 1] + val + ) + ) diff --git a/tests/bgapi/packets.py b/tests/bgapi/packets.py index 50edc03c..31f55d78 100644 --- a/tests/bgapi/packets.py +++ b/tests/bgapi/packets.py @@ -3,139 +3,195 @@ class BGAPIPacketBuilder(object): @staticmethod - def attclient_attribute_write( - connection_handle, return_code): + def attclient_attribute_write(connection_handle, return_code): # TODO create a Packet class to wrap this, where you pass in the various # values, then call .pack(). where do we unpack? could that be the same # class? - return pack('<4BBH', 0x00, 0x03, 0x04, 0x05, connection_handle, - return_code) + return pack("<4BBH", 0x00, 0x03, 0x04, 0x05, connection_handle, return_code) @staticmethod - def attclient_write_command( - connection_handle, return_code): - return pack('<4BBH', 0x00, 0x03, 0x04, 0x06, connection_handle, - return_code) + def attclient_write_command(connection_handle, return_code): + return pack("<4BBH", 0x00, 0x03, 0x04, 0x06, connection_handle, return_code) @staticmethod - def attclient_find_information( - connection_handle, return_code): - return pack('<4BBH', 0x00, 0x03, 0x04, 0x03, connection_handle, - return_code) + def attclient_find_information(connection_handle, return_code): + return pack("<4BBH", 0x00, 0x03, 0x04, 0x03, connection_handle, return_code) @staticmethod def attclient_read_by_handle(connection_handle, return_code): - return pack('<4BBH', 0x00, 0x03, 0x04, 0x04, connection_handle, - return_code) + return pack("<4BBH", 0x00, 0x03, 0x04, 0x04, connection_handle, return_code) @staticmethod def connection_disconnect(connection_handle, return_code): - return pack('<4BBH', 0x00, 0x03, 0x03, 0x00, connection_handle, - return_code) + return pack("<4BBH", 0x00, 0x03, 0x03, 0x00, connection_handle, return_code) @staticmethod def connection_get_rssi(connection_handle, rssi_value): - return pack('<4BBb', 0x00, 0x02, 0x03, 0x01, connection_handle, - rssi_value) + return pack("<4BBb", 0x00, 0x02, 0x03, 0x01, connection_handle, rssi_value) @staticmethod def gap_connect_direct(connection_handle, return_code): - return pack('<4BHB', 0x00, 0x03, 0x06, 0x03, return_code, - connection_handle) + return pack("<4BHB", 0x00, 0x03, 0x06, 0x03, return_code, connection_handle) @staticmethod def gap_discover(return_code): - return pack('<4BH', 0x00, 0x02, 0x06, 0x02, return_code) + return pack("<4BH", 0x00, 0x02, 0x06, 0x02, return_code) @staticmethod def gap_end_procedure(return_code): - return pack('<4BH', 0x00, 0x02, 0x06, 0x04, return_code) + return pack("<4BH", 0x00, 0x02, 0x06, 0x04, return_code) @staticmethod def gap_set_mode(return_code): - return pack('<4BH', 0x00, 0x02, 0x06, 0x01, return_code) + return pack("<4BH", 0x00, 0x02, 0x06, 0x01, return_code) @staticmethod def gap_set_scan_parameters(return_code): - return pack('<4BH', 0x00, 0x02, 0x06, 0x07, return_code) + return pack("<4BH", 0x00, 0x02, 0x06, 0x07, return_code) @staticmethod def sm_delete_bonding(return_code): - return pack('<4BH', 0x00, 0x02, 0x05, 0x02, return_code) + return pack("<4BH", 0x00, 0x02, 0x05, 0x02, return_code) @staticmethod def sm_encrypt_start(connection_handle, return_code): - return pack('<4BBH', 0x00, 0x03, 0x05, 0x00, connection_handle, - return_code) + return pack("<4BBH", 0x00, 0x03, 0x05, 0x00, connection_handle, return_code) @staticmethod def sm_get_bonds(num_bonds): - assert((num_bonds >= 0) and (num_bonds <= 8)) # hardware constraint - return pack('<4BB', 0x00, 0x01, 0x05, 0x05, num_bonds) + assert (num_bonds >= 0) and (num_bonds <= 8) # hardware constraint + return pack("<4BB", 0x00, 0x01, 0x05, 0x05, num_bonds) @staticmethod def sm_set_bondable_mode(): - return pack('<4B', 0x00, 0x00, 0x05, 0x01) + return pack("<4B", 0x00, 0x00, 0x05, 0x01) @staticmethod - def attclient_attribute_value( - connection_handle, att_handle, att_type, value): + def attclient_attribute_value(connection_handle, att_handle, att_type, value): # the first byte of value must be the length of value - assert((len(value) > 0) and (value[0] == len(value))) - return pack('<4BBHB%dB' % len(value), 0x80, 4 + len(value), - 0x04, 0x05, connection_handle, att_handle, att_type, - *bytearray(value)) + assert (len(value) > 0) and (value[0] == len(value)) + return pack( + "<4BBHB%dB" % len(value), + 0x80, + 4 + len(value), + 0x04, + 0x05, + connection_handle, + att_handle, + att_type, + *bytearray(value) + ) @staticmethod def attclient_find_information_found(connection_handle, chr_handle, uuid): # the first byte of uuid must be the length of uuid - assert((len(uuid) > 0) and (uuid[0] == len(uuid))) - return pack('<4BBH%dB' % len(uuid), 0x80, 3 + len(uuid), 0x04, - 0x04, connection_handle, chr_handle, *bytearray(uuid)) - - @staticmethod - def attclient_procedure_completed( - connection_handle, return_code, chr_handle): - return pack('<4BB2H', 0x80, 0x05, 0x04, 0x01, connection_handle, - return_code, chr_handle) - - @staticmethod - def connection_status(addr, flags, connection_handle, address_type, - connection_interval, timeout, latency, bonding): + assert (len(uuid) > 0) and (uuid[0] == len(uuid)) return pack( - '<4B2B6BB3HB', 0x80, 0x10, 0x03, 0x00, connection_handle, flags, - addr[0], addr[1], addr[2], addr[3], addr[4], addr[5], address_type, - connection_interval, timeout, latency, bonding) + "<4BBH%dB" % len(uuid), + 0x80, + 3 + len(uuid), + 0x04, + 0x04, + connection_handle, + chr_handle, + *bytearray(uuid) + ) + + @staticmethod + def attclient_procedure_completed(connection_handle, return_code, chr_handle): + return pack( + "<4BB2H", 0x80, 0x05, 0x04, 0x01, connection_handle, return_code, chr_handle + ) + + @staticmethod + def connection_status( + addr, + flags, + connection_handle, + address_type, + connection_interval, + timeout, + latency, + bonding, + ): + return pack( + "<4B2B6BB3HB", + 0x80, + 0x10, + 0x03, + 0x00, + connection_handle, + flags, + addr[0], + addr[1], + addr[2], + addr[3], + addr[4], + addr[5], + address_type, + connection_interval, + timeout, + latency, + bonding, + ) @staticmethod def connection_disconnected(connection_handle, return_code): - return pack('<4BBH', 0x80, 0x03, 0x03, 0x04, connection_handle, - return_code) + return pack("<4BBH", 0x80, 0x03, 0x03, 0x04, connection_handle, return_code) @staticmethod - def gap_scan_response( - rssi, packet_type, bd_addr, addr_type, bond, data): + def gap_scan_response(rssi, packet_type, bd_addr, addr_type, bond, data): # the first byte of data must be the length of data - assert((len(data) > 0) and (data[0] == len(data))) - return pack('<4Bb9B%dB' % len(data), 0x80, 10 + len(data), - 0x06, 0x00, rssi, packet_type, bd_addr[5], bd_addr[4], - bd_addr[3], bd_addr[2], bd_addr[1], bd_addr[0], addr_type, - bond, *data) + assert (len(data) > 0) and (data[0] == len(data)) + return pack( + "<4Bb9B%dB" % len(data), + 0x80, + 10 + len(data), + 0x06, + 0x00, + rssi, + packet_type, + bd_addr[5], + bd_addr[4], + bd_addr[3], + bd_addr[2], + bd_addr[1], + bd_addr[0], + addr_type, + bond, + *data + ) @staticmethod def system_boot(): - (major_version, minor_version, patch, build, linklayer_ver, - protocol_ver, hw_ver) = (1, 2, 3, 4, 5, 6, 7) - return pack('<4B5H2B', 0x80, 0x0C, 0x00, 0x00, major_version, - minor_version, patch, build, linklayer_ver, - protocol_ver, hw_ver) + ( + major_version, + minor_version, + patch, + build, + linklayer_ver, + protocol_ver, + hw_ver, + ) = (1, 2, 3, 4, 5, 6, 7) + return pack( + "<4B5H2B", + 0x80, + 0x0C, + 0x00, + 0x00, + major_version, + minor_version, + patch, + build, + linklayer_ver, + protocol_ver, + hw_ver, + ) @staticmethod def sm_bond_status(bond_handle, keysize, mitm, keys): - return pack('<4B4B', 0x80, 0x04, 0x05, 0x04, bond_handle, keysize, mitm, - keys) + return pack("<4B4B", 0x80, 0x04, 0x05, 0x04, bond_handle, keysize, mitm, keys) @staticmethod def sm_bonding_fail(connection_handle, return_code): - return pack('<4BBH', 0x80, 0x03, 0x05, 0x01, connection_handle, - return_code) + return pack("<4BBH", 0x80, 0x03, 0x05, 0x01, connection_handle, return_code) diff --git a/tests/bgapi/test_bgapi.py b/tests/bgapi/test_bgapi.py index f7c639b5..38e6fb18 100644 --- a/tests/bgapi/test_bgapi.py +++ b/tests/bgapi/test_bgapi.py @@ -1,14 +1,12 @@ from __future__ import print_function -from nose.tools import eq_, ok_ import mock import unittest import serial from pygatt.backends import BGAPIBackend -from pygatt.backends.bgapi.bgapi import (bgapi_address_to_hex, - MAX_CONNECTION_ATTEMPTS) +from pygatt.backends.bgapi.bgapi import bgapi_address_to_hex, MAX_CONNECTION_ATTEMPTS from pygatt.backends.bgapi.util import extract_vid_pid from pygatt.backends.bgapi.error_codes import get_return_message from pygatt.backends.bgapi import bglib @@ -21,19 +19,18 @@ class BGAPIBackendTests(unittest.TestCase): def setUp(self): self.mock_device = MockBGAPISerialDevice() self.backend = BGAPIBackend( - serial_port=self.mock_device.serial_port_name, - receive_queue_timeout=0.001) + serial_port=self.mock_device.serial_port_name, receive_queue_timeout=0.001 + ) self.address = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB] self.address_string = ":".join("%02x" % b for b in self.address) self.mock_device.stage_run_packets() - self.time_patcher = mock.patch('pygatt.backends.bgapi.bgapi.time') + self.time_patcher = mock.patch("pygatt.backends.bgapi.bgapi.time") self.time_patcher.start() - self.timeout_patcher = mock.patch( - 'pygatt.backends.bgapi.bgapi._timed_out') + self.timeout_patcher = mock.patch("pygatt.backends.bgapi.bgapi._timed_out") timed_out = self.timeout_patcher.start() timed_out.return_value = True @@ -50,8 +47,7 @@ def tearDown(self): self.backend.stop() def _connect(self): - self.mock_device.stage_connect_packets( - self.address, ['connected', 'completed']) + self.mock_device.stage_connect_packets(self.address, ["connected", "completed"]) return self.backend.connect(self.address_string) def test_connect(self): @@ -60,109 +56,155 @@ def test_connect(self): def test_connect_already_connected(self): device = self._connect() another_device = self.backend.connect(self.address_string) - eq_(device, another_device) + assert device == another_device def test_serial_port_connection_failure(self): self.mock_device.mocked_serial.read = mock.MagicMock() self.mock_device.mocked_serial.read.side_effect = ( - serial.serialutil.SerialException) + serial.serialutil.SerialException + ) with self.assertRaises(NotConnectedError): self.backend.start() def test_reset_and_reconnect(self): self.backend.stop() self.backend = BGAPIBackend( - serial_port=self.mock_device.serial_port_name, - receive_queue_timeout=0.001) + serial_port=self.mock_device.serial_port_name, receive_queue_timeout=0.001 + ) self.mock_device.mocked_serial.write = mock.MagicMock() self.mock_device.mocked_serial.read = mock.MagicMock() self.mock_device.mocked_serial.read.side_effect = [None] + [ - serial.serialutil.SerialException] * MAX_CONNECTION_ATTEMPTS + serial.serialutil.SerialException + ] * MAX_CONNECTION_ATTEMPTS with self.assertRaises(NotConnectedError): self.backend.start() - self.assertEquals(MAX_CONNECTION_ATTEMPTS + 1, - self.mock_device.mocked_serial.read.call_count) + self.assertEqual( + MAX_CONNECTION_ATTEMPTS + 1, self.mock_device.mocked_serial.read.call_count + ) self.assertTrue(self.mock_device.mocked_serial.write.called) def test_scan_and_get_devices_discovered(self): # Test scan scan_responses = [] addr_0 = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB] - scan_responses.append({ - 'rssi': -80, - 'packet_type': 0, - 'bd_addr': addr_0, - 'addr_type': 0x00, - 'bond': 0xFF, - 'data': [0x07, 0x09, ord('H'), ord('e'), ord('l'), - ord('l'), ord('o'), ord('!')] - }) + scan_responses.append( + { + "rssi": -80, + "packet_type": 0, + "bd_addr": addr_0, + "addr_type": 0x00, + "bond": 0xFF, + "data": [ + 0x07, + 0x09, + ord("H"), + ord("e"), + ord("l"), + ord("l"), + ord("o"), + ord("!"), + ], + } + ) self.mock_device.stage_scan_packets(scan_responses=scan_responses) - devs = self.backend.scan(timeout=.5) - eq_('Hello!', devs[0]['name']) - eq_(-80, devs[0]['rssi']) + devs = self.backend.scan(timeout=0.5) + assert "Hello!" == devs[0]["name"] + assert -80 == devs[0]["rssi"] def test_clear_bonds(self): # Test delete stored bonds - self.mock_device.stage_clear_bonds_packets( - [0x00, 0x01, 0x02, 0x03, 0x04]) + self.mock_device.stage_clear_bonds_packets([0x00, 0x01, 0x02, 0x03, 0x04]) self.backend.clear_bond() def test_clear_bonds_disconnect(self): """clear_bonds shouldn't abort if disconnected.""" # Test delete stored bonds self.mock_device.stage_clear_bonds_packets( - [0x00, 0x01, 0x02, 0x03, 0x04], disconnects=True) + [0x00, 0x01, 0x02, 0x03, 0x04], disconnects=True + ) self.backend.clear_bond() class UsbInfoStringParsingTests(unittest.TestCase): - def test_weird_platform(self): vid, pid = extract_vid_pid("USB VID_2458 PID_0001") - eq_(0x2458, vid) - eq_(1, pid) + assert 0x2458 == vid + assert 1 == pid def test_linux(self): vid, pid = extract_vid_pid("USB VID:PID=2458:0001 SNR=1") - eq_(0x2458, vid) - eq_(1, pid) + assert 0x2458 == vid + assert 1 == pid def test_mac(self): vid, pid = extract_vid_pid("USB VID:PID=2458:1 SNR=1") - eq_(0x2458, vid) - eq_(1, pid) + assert 0x2458 == vid + assert 1 == pid def test_invalid(self): - eq_(None, extract_vid_pid("2458:1")) + assert extract_vid_pid("2458:1") is None class ReturnCodeTests(unittest.TestCase): - def test_unrecognized_return_code(self): - ok_(get_return_message(123123123123123) is not None) + assert get_return_message(123123123123123) is not None class BGAPIAddressToHexTests(unittest.TestCase): - def test_convert(self): bgapi_address = bytearray([21, 19, 11, 210, 2, 97]) - eq_("61:02:D2:0B:13:15", bgapi_address_to_hex(bgapi_address)) + assert "61:02:D2:0B:13:15" == bgapi_address_to_hex(bgapi_address) class DecodePacketTests(unittest.TestCase): - def setUp(self): self.lib = bglib.BGLib() def test_decode_scan_packet(self): - data = [128, 34, 6, 0, 166, 0, 21, 19, 11, 210, 2, 97, 1, 255, 23, 2, 1, - 6, 19, 255, 76, 0, 12, 14, 0, 100, 39, 38, 61, 167, 226, 128, - 135, 0, 76, 200, 60, 78] + data = [ + 128, + 34, + 6, + 0, + 166, + 0, + 21, + 19, + 11, + 210, + 2, + 97, + 1, + 255, + 23, + 2, + 1, + 6, + 19, + 255, + 76, + 0, + 12, + 14, + 0, + 100, + 39, + 38, + 61, + 167, + 226, + 128, + 135, + 0, + 76, + 200, + 60, + 78, + ] packet_type, packet = self.lib.decode_packet(data) - eq_(bglib.EventPacketType.gap_scan_response, packet_type) - eq_(bytearray([21, 19, 11, 210, 2, 97]), packet['sender']) + assert bglib.EventPacketType.gap_scan_response == packet_type + assert bytearray([21, 19, 11, 210, 2, 97]) == packet["sender"] def test_decode_invalid(self): with self.assertRaises(bglib.UnknownMessageType): diff --git a/tests/bgapi/test_device.py b/tests/bgapi/test_device.py index 548175fd..c0af039d 100644 --- a/tests/bgapi/test_device.py +++ b/tests/bgapi/test_device.py @@ -1,7 +1,7 @@ from __future__ import print_function -from nose.tools import eq_, raises import mock +import pytest import unittest from uuid import UUID @@ -15,19 +15,17 @@ class BGAPIDeviceTests(unittest.TestCase): def setUp(self): self.mock_device = MockBGAPISerialDevice() - self.backend = BGAPIBackend( - serial_port=self.mock_device.serial_port_name) + self.backend = BGAPIBackend(serial_port=self.mock_device.serial_port_name) self.address = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB] self.address_string = ":".join("%02x" % b for b in self.address) self.mock_device.stage_run_packets() - self.time_patcher = mock.patch('pygatt.backends.bgapi.bgapi.time') + self.time_patcher = mock.patch("pygatt.backends.bgapi.bgapi.time") self.time_patcher.start() - self.timeout_patcher = mock.patch( - 'pygatt.backends.bgapi.bgapi._timed_out') + self.timeout_patcher = mock.patch("pygatt.backends.bgapi.bgapi._timed_out") timed_out = self.timeout_patcher.start() timed_out.return_value = True @@ -44,8 +42,7 @@ def tearDown(self): self.backend.stop() def _connect(self): - self.mock_device.stage_connect_packets( - self.address, ['connected', 'completed']) + self.mock_device.stage_connect_packets(self.address, ["connected", "completed"]) device = self.backend.connect(self.address_string) self.assertNotEqual(None, device) return device @@ -58,70 +55,65 @@ def test_disconnect_when_connected(self): def test_char_read(self): device = self._connect() - uuid_char = '01234567-0123-0123-0123-0123456789AB' + uuid_char = "01234567-0123-0123-0123-0123456789AB" handle_char = 0x1234 - uuid_desc = '2902' + uuid_desc = "2902" handle_desc = 0x5678 - self.mock_device.stage_discover_characteristics_packets([ - uuid_char, handle_char, - uuid_desc, handle_desc]) + self.mock_device.stage_discover_characteristics_packets( + [uuid_char, handle_char, uuid_desc, handle_desc] + ) # Test char_read expected_value = [0xBE, 0xEF, 0x15, 0xF0, 0x0D] - self.mock_device.stage_char_read_packets( - handle_char, 0x00, expected_value) + self.mock_device.stage_char_read_packets(handle_char, 0x00, expected_value) value = device.char_read(UUID(uuid_char)) - eq_(bytearray(expected_value), value) + assert bytearray(expected_value) == value # Test ignore of packet with wrong handle expected_value = [0xBE, 0xEF, 0x15, 0xF0, 0x0D] - self.mock_device.stage_char_read_packets( - 0, 0x00, expected_value) - self.mock_device.stage_char_read_packets( - handle_char, 0x00, expected_value) + self.mock_device.stage_char_read_packets(0, 0x00, expected_value) + self.mock_device.stage_char_read_packets(handle_char, 0x00, expected_value) value = device.char_read(UUID(uuid_char)) - eq_(bytearray(expected_value), value) + assert bytearray(expected_value) == value def test_read_nonstandard_4byte_char(self): device = self._connect() - uuid_char = 0x03ea + uuid_char = 0x03EA handle_char = 0x1234 - uuid_desc = '2902' + uuid_desc = "2902" handle_desc = 0x5678 - self.mock_device.stage_discover_characteristics_packets([ - "03ea", handle_char, - uuid_desc, handle_desc]) + self.mock_device.stage_discover_characteristics_packets( + ["03ea", handle_char, uuid_desc, handle_desc] + ) expected_value = [0xBE, 0xEF, 0x15, 0xF0, 0x0D] - self.mock_device.stage_char_read_packets( - handle_char, 0x00, expected_value) + self.mock_device.stage_char_read_packets(handle_char, 0x00, expected_value) value = device.char_read(UUID(str(uuid16_to_uuid(uuid_char)))) - eq_(bytearray(expected_value), value) + assert bytearray(expected_value) == value - @raises(ExpectedResponseTimeout) def test_read_timeout_wrong_handle(self): device = self._connect() - uuid_char = '01234567-0123-0123-0123-0123456789AB' + uuid_char = "01234567-0123-0123-0123-0123456789AB" handle_char = 0x1234 - uuid_desc = '2902' + uuid_desc = "2902" handle_desc = 0x5678 - self.mock_device.stage_discover_characteristics_packets([ - uuid_char, handle_char, - uuid_desc, handle_desc]) + self.mock_device.stage_discover_characteristics_packets( + [uuid_char, handle_char, uuid_desc, handle_desc] + ) # Test char_read - expected_value = [0xBE, 0xEF, 0x15, 0xF0, 0x0D] - self.mock_device.stage_char_read_packets( - 0, 0x00, expected_value) - device.char_read(UUID(uuid_char)) + with pytest.raises(ExpectedResponseTimeout): + expected_value = [0xBE, 0xEF, 0x15, 0xF0, 0x0D] + self.mock_device.stage_char_read_packets(0, 0x00, expected_value) + device.char_read(UUID(uuid_char)) def test_char_write(self): device = self._connect() - uuid_char = '01234567-0123-0123-0123-0123456789AB' + uuid_char = "01234567-0123-0123-0123-0123456789AB" handle_char = 0x1234 - uuid_desc = '2902' + uuid_desc = "2902" handle_desc = 0x5678 - self.mock_device.stage_discover_characteristics_packets([ - uuid_char, handle_char, - uuid_desc, handle_desc]) + self.mock_device.stage_discover_characteristics_packets( + [uuid_char, handle_char, uuid_desc, handle_desc] + ) # Test char_write request value = [0xF0, 0x0F, 0x00] self.mock_device.stage_char_write_packets(0, value) @@ -135,27 +127,30 @@ def test_char_write(self): def test_bond(self): device = self._connect() self.mock_device.stage_bond_packets( - self.address, ['connected', 'encrypted', 'parameters_change']) + self.address, ["connected", "encrypted", "parameters_change"] + ) device.bond() def test_get_rssi(self): device = self._connect() # Test get_rssi self.mock_device.stage_get_rssi_packets() - eq_(-80, device.get_rssi()) + assert -80 == device.get_rssi() def test_discover_characteristics(self): device = self._connect() - uuid_char = UUID('01234567-0123-0123-0123-0123456789AB') + uuid_char = UUID("01234567-0123-0123-0123-0123456789AB") handle_char = 0x1234 - uuid_desc = '2902' + uuid_desc = "2902" handle_desc = 0x5678 - self.mock_device.stage_discover_characteristics_packets([ - str(uuid_char), handle_char, - uuid_desc, handle_desc]) + self.mock_device.stage_discover_characteristics_packets( + [str(uuid_char), handle_char, uuid_desc, handle_desc] + ) characteristics = device.discover_characteristics() - eq_(characteristics[uuid_char].handle, handle_char) - eq_(characteristics[uuid_char].descriptors[uuid16_to_uuid(0x2902)], - handle_desc) + assert characteristics[uuid_char].handle == handle_char + assert ( + characteristics[uuid_char].descriptors[uuid16_to_uuid(0x2902)] + == handle_desc + ) diff --git a/tests/gatttool/test_backend.py b/tests/gatttool/test_backend.py index 9454f803..8f7a358d 100644 --- a/tests/gatttool/test_backend.py +++ b/tests/gatttool/test_backend.py @@ -1,6 +1,5 @@ from __future__ import print_function -from nose.tools import eq_, ok_ from mock import patch, MagicMock import pexpect @@ -13,14 +12,11 @@ class GATTToolBackendTests(unittest.TestCase): def setUp(self): self.patchers = [] - self.patchers.append( - patch('pygatt.backends.gatttool.gatttool.pexpect.spawn')) + self.patchers.append(patch("pygatt.backends.gatttool.gatttool.pexpect.spawn")) self.spawn = self.patchers[0].start() self.spawn.return_value.isalive.return_value = False - self.spawn.return_value.read_nonblocking.side_effect = ( - pexpect.EOF(value=None)) - self.patchers.append( - patch('pygatt.backends.gatttool.gatttool.subprocess')) + self.spawn.return_value.read_nonblocking.side_effect = pexpect.EOF(value=None) + self.patchers.append(patch("pygatt.backends.gatttool.gatttool.subprocess")) self.patchers[1].start() # Just keep saying we got the "Connected" response @@ -45,42 +41,42 @@ def tearDown(self): def test_scan(self): # TODO mock a successful scan devices = self.backend.scan() - ok_(devices is not None) - eq_(0, len(devices)) + assert devices is not None + assert 0 == len(devices) def test_connect(self): address = "11:22:33:44:55:66" device = self.backend.connect(address) - ok_(device is not None) + assert device is not None def test_disconnect_callback(self): # Just keep saying we got the "Disconnected" response def rate_limited_expect_d(*args, **kwargs): - time.sleep(0.001) - # hard code the "Disconnected" event - return 1 + time.sleep(0.001) + # hard code the "Disconnected" event + return 1 mock_callback = MagicMock() address = "11:22:33:44:55:66" device = self.backend.connect(address) device.register_disconnect_callback(mock_callback) - eq_(mock_callback in - device._backend._receiver._event_vector[ - "disconnected"]["callback"], - True) + assert ( + mock_callback + in device._backend._receiver._event_vector["disconnected"]["callback"] + ) self.spawn.return_value.expect.side_effect = rate_limited_expect_d time.sleep(0.1) - ok_(mock_callback.called) + assert mock_callback.called device.remove_disconnect_callback(mock_callback) - eq_(mock_callback not in - device._backend._receiver._event_vector[ - "disconnected"]["callback"], - True) - eq_(len(device._backend._receiver._event_vector[ - "disconnected"]["callback"]) > 0, - True) + assert ( + mock_callback + not in device._backend._receiver._event_vector["disconnected"]["callback"] + ) + assert ( + len(device._backend._receiver._event_vector["disconnected"]["callback"]) > 0 + ) def test_auto_reconnect_call(self): # Just keep saying we got the "Disconnected" response @@ -94,7 +90,7 @@ def rate_limited_expect_d(*args, **kwargs): device._backend.reconnect = MagicMock() self.spawn.return_value.expect.side_effect = rate_limited_expect_d time.sleep(0.1) - ok_(device._backend.reconnect.called) + assert device._backend.reconnect.called def test_no_reconnect_default(self): # Just keep saying we got the "Disconnected" response @@ -108,7 +104,7 @@ def rate_limited_expect_d(*args, **kwargs): device._backend.reconnect = MagicMock() self.spawn.return_value.expect.side_effect = rate_limited_expect_d time.sleep(0.1) - ok_(not device._backend.reconnect.called) + assert not device._backend.reconnect.called def test_no_reconnect_disconnect(self): # Just keep saying we got the "Disconnected" response @@ -123,7 +119,7 @@ def rate_limited_expect_d(*args, **kwargs): device.disconnect() self.spawn.return_value.expect.side_effect = rate_limited_expect_d time.sleep(0.1) - ok_(not device._backend.reconnect.called) + assert not device._backend.reconnect.called def test_auto_reconnect(self): # Just keep saying we got the "Disconnected" response @@ -145,62 +141,54 @@ def rate_limited_expect_c(*args, **kwargs): device.resubscribe_all = MagicMock() self.spawn.return_value.expect.side_effect = rate_limited_expect_c time.sleep(0.1) - ok_(device.resubscribe_all.called) + assert device.resubscribe_all.called def test_single_byte_notification(self): - event = { - 'after': "Notification handle = 0x0024 value: 64".encode("utf8") - } + event = {"after": "Notification handle = 0x0024 value: 64".encode("utf8")} address = "11:22:33:44:55:66" device = self.backend.connect(address) device.receive_notification = MagicMock() device._backend._handle_notification_string(event) - ok_(device.receive_notification.called) - eq_(0x24, device.receive_notification.call_args[0][0]) - eq_(bytearray([0x64]), device.receive_notification.call_args[0][1]) + assert device.receive_notification.called + assert 0x24 == device.receive_notification.call_args[0][0] + assert bytearray([0x64]) == device.receive_notification.call_args[0][1] def test_multi_byte_notification(self): event = { - 'after': ( - "Notification handle = 0x0024 value: 64 46 72".encode("utf8")) + "after": ("Notification handle = 0x0024 value: 64 46 72".encode("utf8")) } address = "11:22:33:44:55:66" device = self.backend.connect(address) device.receive_notification = MagicMock() device._backend._handle_notification_string(event) - ok_(device.receive_notification.called) - eq_(0x24, device.receive_notification.call_args[0][0]) - eq_(bytearray([0x64, 0x46, 0x72]), - device.receive_notification.call_args[0][1]) + assert device.receive_notification.called + assert 0x24 == device.receive_notification.call_args[0][0] + assert ( + bytearray([0x64, 0x46, 0x72]) == device.receive_notification.call_args[0][1] + ) def test_empty_notification(self): - event = { - 'after': "Notification handle = 0x0024 value: ".encode("utf8") - } + event = {"after": "Notification handle = 0x0024 value: ".encode("utf8")} address = "11:22:33:44:55:66" device = self.backend.connect(address) device.receive_notification = MagicMock() device._backend._handle_notification_string(event) - ok_(device.receive_notification.called) + assert device.receive_notification.called def test_malformed_notification(self): - event = { - 'after': "Notification handle = 0x0024vlue: ".encode("utf8") - } + event = {"after": "Notification handle = 0x0024vlue: ".encode("utf8")} address = "11:22:33:44:55:66" device = self.backend.connect(address) device.receive_notification = MagicMock() device._backend._handle_notification_string(event) - ok_(not device.receive_notification.called) + assert not device.receive_notification.called def test_indication(self): - event = { - 'after': "Indication handle = 0x0024 value: 64".encode("utf8") - } + event = {"after": "Indication handle = 0x0024 value: 64".encode("utf8")} address = "11:22:33:44:55:66" device = self.backend.connect(address) device.receive_notification = MagicMock() device._backend._handle_notification_string(event) - ok_(device.receive_notification.called) - eq_(0x24, device.receive_notification.call_args[0][0]) - eq_(bytearray([0x64]), device.receive_notification.call_args[0][1]) + assert device.receive_notification.called + assert 0x24 == device.receive_notification.call_args[0][0] + assert bytearray([0x64]) == device.receive_notification.call_args[0][1] diff --git a/tests/gatttool/test_device.py b/tests/gatttool/test_device.py index 3862cdcc..a187d94b 100644 --- a/tests/gatttool/test_device.py +++ b/tests/gatttool/test_device.py @@ -1,7 +1,7 @@ +import pytest import unittest import uuid from mock import MagicMock, patch -from nose.tools import ok_, eq_, raises from pygatt.exceptions import NotConnectedError from pygatt.backends import Characteristic @@ -23,63 +23,63 @@ def setUp(self): def test_bond(self): self.device.bond() - ok_(self.backend.bond.called) - eq_(self.device, self.backend.bond.call_args[0][0]) + assert self.backend.bond.called + assert self.device == self.backend.bond.call_args[0][0] def test_char_read(self): expected_value = bytearray(range(4)) self.backend.char_read.return_value = expected_value - with patch.object(self.backend, 'get_handle', return_value=24 - ) as get_handle: + with patch.object(self.backend, "get_handle", return_value=24) as get_handle: char_uuid = uuid.uuid4() value = self.device.char_read(char_uuid) - ok_(not get_handle.called) - ok_(self.backend.char_read.called) - eq_(self.device, self.backend.char_read.call_args[0][0]) - eq_(char_uuid, self.backend.char_read.call_args[0][1]) - eq_(expected_value, value) + assert not get_handle.called + assert self.backend.char_read.called + assert self.device == self.backend.char_read.call_args[0][0] + assert char_uuid == self.backend.char_read.call_args[0][1] + assert expected_value == value def test_char_write(self): - with patch.object(self.device, 'get_handle', return_value=24 - ) as get_handle: + with patch.object(self.device, "get_handle", return_value=24) as get_handle: char_uuid = uuid.uuid4() value = bytearray(range(4)) self.device.char_write(char_uuid, value) - ok_(get_handle.called) - eq_(char_uuid, get_handle.call_args[0][0]) - ok_(self.backend.char_write_handle.called) - eq_(self.device, self.backend.char_write_handle.call_args[0][0]) - eq_(24, self.backend.char_write_handle.call_args[0][1]) - eq_(value, self.backend.char_write_handle.call_args[0][2]) + assert get_handle.called + assert char_uuid == get_handle.call_args[0][0] + assert self.backend.char_write_handle.called + assert self.device == self.backend.char_write_handle.call_args[0][0] + assert 24 == self.backend.char_write_handle.call_args[0][1] + assert value == self.backend.char_write_handle.call_args[0][2] def test_disconnect(self): self.device.disconnect() - ok_(self.backend.disconnect.called) - eq_(self.device, self.backend.disconnect.call_args[0][0]) + assert self.backend.disconnect.called + assert self.device == self.backend.disconnect.call_args[0][0] def test_additional_disconnect_callback(self): mock_callback = MagicMock() self.device.register_disconnect_callback(mock_callback) self.backend._receiver.register_callback.assert_called_with( - "disconnected", mock_callback) + "disconnected", mock_callback + ) self.device.remove_disconnect_callback(mock_callback) self.backend._receiver.remove_callback.assert_called_with( - "disconnected", mock_callback) + "disconnected", mock_callback + ) - @raises(NotConnectedError) def test_write_after_disconnect(self): - self.device.disconnect() - self.device.char_read(uuid.uuid4()) + with pytest.raises(NotConnectedError): + self.device.disconnect() + self.device.char_read(uuid.uuid4()) def test_get_handle(self): handle = self.device.get_handle(self.char_uuid) - ok_(self.backend.discover_characteristics.called) - eq_(self.device, self.backend.discover_characteristics.call_args[0][0]) - eq_(self.expected_handle, handle) + assert self.backend.discover_characteristics.called + assert self.device == self.backend.discover_characteristics.call_args[0][0] + assert self.expected_handle == handle def test_get_cached_handle(self): handle = self.device.get_handle(self.char_uuid) - with patch.object(self.backend, 'discover_characteristics') as discover: + with patch.object(self.backend, "discover_characteristics") as discover: next_handle = self.device.get_handle(self.char_uuid) - eq_(handle, next_handle) - ok_(not discover.called) + assert handle == next_handle + assert not discover.called diff --git a/tests/serial_mock.py b/tests/serial_mock.py index fddb74c5..6051f81d 100644 --- a/tests/serial_mock.py +++ b/tests/serial_mock.py @@ -11,6 +11,7 @@ class SerialMock(object): """ Spoof a serial.Serial object. """ + def __init__(self, port, timeout): self._isOpen = True self._port = port diff --git a/tests/test_device.py b/tests/test_device.py index 4b3bede9..8c63591f 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -1,20 +1,17 @@ import unittest import uuid from mock import MagicMock, patch -from nose.tools import ok_, eq_ from pygatt import BLEDevice from pygatt.backends import Characteristic -class TestBLEDevice(BLEDevice): +class MockBLEDevice(BLEDevice): CHAR_UUID = uuid.uuid4() EXPECTED_HANDLE = 99 def discover_characteristics(self): - return { - self.CHAR_UUID: Characteristic(self.CHAR_UUID, self.EXPECTED_HANDLE) - } + return {self.CHAR_UUID: Characteristic(self.CHAR_UUID, self.EXPECTED_HANDLE)} class BLEDeviceTest(unittest.TestCase): @@ -22,23 +19,23 @@ def setUp(self): super(BLEDeviceTest, self).setUp() self.address = "11:22:33:44:55:66" self.backend = MagicMock() - self.device = TestBLEDevice(self.address) + self.device = MockBLEDevice(self.address) def _subscribe(self): callback = MagicMock() - with patch.object(self.device, 'char_write_handle') as char_write: + with patch.object(self.device, "char_write_handle") as char_write: self.device.subscribe(self.device.CHAR_UUID, callback=callback) - ok_(char_write.called) - eq_(self.device.EXPECTED_HANDLE + 1, char_write.call_args[0][0]) - eq_(bytearray([1, 0]), char_write.call_args[0][1]) + assert char_write.called + assert self.device.EXPECTED_HANDLE + 1 == char_write.call_args[0][0] + assert bytearray([1, 0]) == char_write.call_args[0][1] return callback def _unsubscribe(self): callback = MagicMock() - with patch.object(self.device, 'char_write_handle') as char_write: + with patch.object(self.device, "char_write_handle") as char_write: self.device.unsubscribe(self.device.CHAR_UUID) - eq_(self.device.EXPECTED_HANDLE + 1, char_write.call_args[0][0]) - eq_(bytearray([0, 0]), char_write.call_args[0][1]) + assert self.device.EXPECTED_HANDLE + 1 == char_write.call_args[0][0] + assert bytearray([0, 0]) == char_write.call_args[0][1] return callback def test_subscribe(self): @@ -51,25 +48,23 @@ def test_unsubscribe(self): def test_subscribe_another_callback(self): self._subscribe() another_callback = MagicMock() - with patch.object(self.device, 'char_write_handle') as char_write: - self.device.subscribe(self.device.CHAR_UUID, - callback=another_callback) - ok_(not char_write.called) + with patch.object(self.device, "char_write_handle") as char_write: + self.device.subscribe(self.device.CHAR_UUID, callback=another_callback) + assert not char_write.called def test_receive_notification(self): callback = self._subscribe() value = bytearray([24]) - self.device.receive_notification(TestBLEDevice.EXPECTED_HANDLE, value) - ok_(callback.called) - eq_(TestBLEDevice.EXPECTED_HANDLE, callback.call_args[0][0]) - eq_(value, callback.call_args[0][1]) + self.device.receive_notification(MockBLEDevice.EXPECTED_HANDLE, value) + assert callback.called + assert MockBLEDevice.EXPECTED_HANDLE == callback.call_args[0][0] + assert value == callback.call_args[0][1] def test_ignore_notification_for_another_handle(self): callback = self._subscribe() value = bytearray([24]) - self.device.receive_notification( - TestBLEDevice.EXPECTED_HANDLE + 1, value) - ok_(not callback.called) + self.device.receive_notification(MockBLEDevice.EXPECTED_HANDLE + 1, value) + assert not callback.called def test_unicode_get_handle(self): try: diff --git a/tox.ini b/tox.ini index b973fe92..9f6b7deb 100644 --- a/tox.ini +++ b/tox.ini @@ -4,16 +4,17 @@ # and then run "tox" from this directory. [tox] -envlist = py27 - py39 +envlist = py39 + p312 [testenv] sitepackages=False -commands = nosetests tests +commands = pytest tests flake8 deps = -r{toxinidir}/requirements.txt pexpect [flake8] -max-line-length = 80 +ignore = W503, E203 +max-line-length = 88 exclude=*.egg,.eggs,.tox,./lib,./bin