Skip to content

Commit

Permalink
update connection on sys exit
Browse files Browse the repository at this point in the history
  • Loading branch information
salman2135 committed Nov 9, 2023
1 parent 592cae5 commit e502902
Showing 1 changed file with 62 additions and 23 deletions.
85 changes: 62 additions & 23 deletions src/explorepy/serial_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# -*- coding: utf-8 -*-
"""A module for bluetooth connection"""
import logging
import os
import subprocess
import sys

import serial
import serial.tools.list_ports as lp
import time
Expand All @@ -23,35 +27,41 @@ class SerialClient:
def __init__(self, device_name):
"""Initialize Bluetooth connection
"""
self.mac_address = None
self.is_connected = False
self.device_name = device_name
self.bt_serial_port_manager = None
self.device_manager = None
self.bt_sdk = None

def connect(self):
"""Connect to the device and return the socket
Returns:
socket (bluetooth.socket)
"""

self.device_name = "Explore_" + self.device_name
config_manager = settings_manager.SettingsManager(self.device_name)
self.mac_address = config_manager.get_mac_address()
if self.mac_address is None:
self._find_mac_address()
config_manager.set_mac_address(self.mac_address)

for _ in range(5):

try:
#if len(lp.grep(self.device_name)) > 0:
self.bt_serial_port_manager = serial.Serial('/dev/tty.Explore_84E4', 9600, timeout=1)
self.bt_serial_port_manager.reset_input_buffer()
self.connect_bluetooth_device(self.mac_address)
self.bt_serial_port_manager = serial.Serial('/dev/tty.' + self.device_name, 9600)
print('/dev/tty.' + self.device_name)
self.is_connected = True
return 0
except Exception as error:
self.is_connected = False
logger.debug(
"Got an exception while connecting to the device: {} of type: {}".format(error, type(error))
)
logger.debug('trying to connect again as tty port is not visible yet')
logger.warning("Could not connect; Retrying in 2s...")
time.sleep(2)
return -1

self.is_connected = False
raise DeviceNotFoundError(
Expand All @@ -64,29 +74,42 @@ def reconnect(self):
This function reconnects to the last bluetooth socket. If after 1 minute the connection doesn't succeed,
program will end.
"""
self.is_connected = False
logger.error("Could not reconnect after 5 attempts. Closing the socket.")
return None

def reconnect(self):
"""Reconnect to the last used bluetooth socket.
This function reconnects to the last bluetooth socket. If after 1 minute the connection doesn't succeed,
program will end.
"""
self.is_connected = False
for _ in range(5):
connection_error_code = self.bt_serial_port_manager.Connect()
logger.debug("Got an exception while connecting to the device: {}".format(connection_error_code))
if connection_error_code == 0:
self.is_connected = True
logger.info('Connected to the device')
return self.bt_serial_port_manager
else:
self.is_connected = False
logger.warning("Couldn't connect to the device. Trying to reconnect...")
time.sleep(2)
logger.error("Could not reconnect after 5 attempts. Closing the socket.")
return None

def disconnect(self):
"""Disconnect from the device"""
self.is_connected = False
self.bt_serial_port_manager.Close()
self.bt_serial_port_manager.close()

def _find_mac_address(self):
self.device_manager = exploresdk.ExploreSDK.Create()
for _ in range(5):
available_list = self.device_manager.PerformDeviceSearch()
logger.debug("Number of devices found: {}".format(len(available_list)))
for bt_device in available_list:
if bt_device.name == self.device_name:
self.mac_address = bt_device.address
return

logger.warning("No device found with the name: %s, searching again...", self.device_name)
time.sleep(0.1)
raise DeviceNotFoundError("No device found with the name: {}".format(self.device_name))

if self.device_name[8] == '8':
self.mac_default = '00:13:43:A1:'
else:
self.mac_default = '00:13:43:93:'
print(self.device_name[8])
id_to_mac = self.device_name[-4:-2] + ':' + self.device_name[-2:]

self.mac_address = self.mac_default + id_to_mac
def read(self, n_bytes):
"""Read n_bytes from the socket
Expand Down Expand Up @@ -114,6 +137,22 @@ def send(self, data):
"""
self.bt_serial_port_manager.write(data)

def disconnect(self):
"""Disconnect from the device"""
self.is_connected = False
self.bt_serial_port_manager.Close()

@staticmethod
def _check_mac_address(device_name, mac_address):
return (device_name[-4:-2] == mac_address[-5:-3]) and (device_name[-2:] == mac_address[-2:])

def connect_bluetooth_device(self, device_address, wait=False):
if wait == True:
connect_string = '--wait-connect 5'
else:
connect_string = '--connect'
try:
subprocess.run(["blueutil", connect_string, device_address], check=True)
print(f"Attempted to connect to the device with address: {device_address}")
except subprocess.CalledProcessError as e:
print(f"Failed to connect to the device: {e}")

0 comments on commit e502902

Please sign in to comment.