Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bluegiga adapter support #126

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,35 @@ Install from pypi
$ python -m pip install ruuvitag-sensor==1.2.1
```

## Bluegiga

Use Bluegiga's BGAPI, which is compatible with USB adapters like the BLED112. Bluegiga should work with Linux, macOS and Windows.

Requires pygatt and pexpect, which are not installed automatically with `ruuvitag_sensor` package. You can install those manually e.g. via pip.

```sh
$ pip install pygatt pexpect
```

Add environment variable `RUUVI_BLE_ADAPTER` with value `Bluegiga`. E.g.

```sh
$ export RUUVI_BLE_ADAPTER="Bluegiga"
```
By default, pygatt will automatically detect USB adapters serial port, but if you have multiple Bluegiga adapters installed or pygatt can not find correct serial port automatically, serial port can be passed with `bt_device` parameter.

```sh
bt_device='/dev/ttyACM0'
```

Pygatt reset Bluegiga USB adapter during start, which might cause issues e.g. in VM environment. Reset can be disabled by environment variable `BLUEGIGA_RESET` with value `False`. E.g.

```sh
$ export BLUEGIGA_RESET="False"
```

Any other value is interpreted as a True.

## Examples

Examples are in [examples](https://github.com/ttu/ruuvitag-sensor/tree/master/examples) directory, e.g.
Expand Down
3 changes: 3 additions & 0 deletions ruuvitag_sensor/adapters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ def get_ble_adapter():
elif 'bleson' in os.environ.get('RUUVI_BLE_ADAPTER', '').lower():
from ruuvitag_sensor.adapters.bleson import BleCommunicationBleson
return BleCommunicationBleson()
elif 'bluegiga' in os.environ.get('RUUVI_BLE_ADAPTER', '').lower():
from ruuvitag_sensor.adapters.bluegiga import BleCommunicationBluegiga
return BleCommunicationBluegiga()
elif 'RUUVI_NIX_FROMFILE' in os.environ:
# Emulate BleCommunicationNix by reading hcidump data from a file
from ruuvitag_sensor.adapters.nix_hci_file import BleCommunicationNixFile
Expand Down
154 changes: 154 additions & 0 deletions ruuvitag_sensor/adapters/bluegiga.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import logging
from multiprocessing import Manager
from multiprocessing.managers import DictProxy
from multiprocessing.queues import Queue
import os
import time
from typing import Iterator, List
import pygatt
import binascii
import threading


from ruuvitag_sensor.adapters import BleCommunication
from ruuvitag_sensor.ruuvi_types import MacAndRawData, RawData

log = logging.getLogger(__name__)


class BleCommunicationBluegiga(BleCommunication):
"""Bluetooth LE communication for Bluegiga"""

@staticmethod
def get_first_data(mac: str, bt_device: str = '') -> RawData:
if not bt_device:
adapter = pygatt.BGAPIBackend()
else:
adapter = pygatt.BGAPIBackend(bt_device)

def scan_received(devices, addr, packet_type):
if mac and mac == addr:
log.debug('Received data from device: %s %s', addr, packet_type)
return True # stop scan

reset = False if os.environ.get('BLUEGIGA_RESET', '').upper() == 'FALSE' else True
adapter.start(reset=reset)
log.debug('Start receiving broadcasts (device %s)', bt_device)
try:
devices = adapter.scan(timeout=60, active=False, scan_cb=scan_received)
for dev in devices:
if mac and mac == dev['address']:
log.debug('Result found for device %s', mac)
rawdata = dev['packet_data']['non-connectable_advertisement_packet']['manufacturer_specific_data'] # noqa: E501
hexa = binascii.hexlify(rawdata).decode('ascii').upper()
hexa_formatted = BleCommunicationBluegiga._fix_payload(hexa)
log.debug('Data found: %s', hexa_formatted)
return hexa_formatted
finally:
adapter.stop()

@staticmethod
def get_data(blacklist: List[str] = [], bt_device: str = '') -> Iterator[MacAndRawData]:
m = Manager()
q = m.Queue()

# Use Manager dict to share data between processes
shared_data = m.dict()
shared_data['blacklist'] = blacklist
shared_data['stop'] = False

# Start background process
scanner = threading.Thread(
name='Bluegiga scanner',
target=BleCommunicationBluegiga._run_get_data_background,
args=[q, shared_data, bt_device])
scanner.start()

try:
while True:
while not q.empty():
data = q.get()
log.debug('Found data: %s', data)
yield data
time.sleep(0.1)
if not scanner.is_alive():
raise Exception('Bluegiga scanner is not alive')
except GeneratorExit:
pass
except KeyboardInterrupt:
pass
except Exception as ex:
log.info(ex)

log.debug('Stop')
shared_data['stop'] = True
scanner.join()
log.debug('Exit')
return

@staticmethod
def _run_get_data_background(queue: Queue, shared_data: DictProxy, bt_device: str):
"""
Attributes:
device (string): BLE device (default auto)
"""

if bt_device:
adapter = pygatt.BGAPIBackend(bt_device)
else:
adapter = pygatt.BGAPIBackend()

reset = False if os.environ.get('BLUEGIGA_RESET', '').upper() == 'FALSE' else True
ttu marked this conversation as resolved.
Show resolved Hide resolved
adapter.start(reset=reset)
try:
while True:
try:
if shared_data['stop']:
break
devices = adapter.scan(timeout=0.5, active=False, )
for dev in devices:
log.debug('received: %s', dev)
mac = str(dev['address'])
if mac and mac in shared_data['blacklist']:
log.debug('MAC blacklisted: %s', mac)
continue
try:
rawdata = dev['packet_data']['non-connectable_advertisement_packet']['manufacturer_specific_data'] # noqa: E501
log.debug('Received manufacturer data from %s: %s', mac, rawdata)
hexa = binascii.hexlify(rawdata).decode('ascii').upper()
hexa_formatted = BleCommunicationBluegiga._fix_payload(hexa)
queue.put((mac, hexa_formatted))
except KeyError:
pass

# Prevent endless loop if device data never found
# Remove when #81 is fixed
queue.put(('', ''))
except GeneratorExit:
return
except KeyboardInterrupt:
return
finally:
log.debug('Stop scan')
adapter.stop()

@staticmethod
def _fix_payload(data: str) -> str:
# Adapter returns data in a different format than the nix_hci
# adapter. Since the rest of the processing pipeline is
# somewhat reliant on the additional data, add to the
# beginning of the actual data:
#
# - An FF type marker
# - A length marker, covering the vendor specific data
# - Another length marker, covering the length-marked
# vendor data.
#
# Thus extended, the result can be parsed by the rest of
# the pipeline.
#
# TODO: This is kinda awkward, and should be handled better.
data = f'FF{data.hex()}'
data = f'{(len(data) >> 1):02x}{data}'
data = f'{(len(data) >> 1):02x}{data}'
return data