Skip to content

Commit

Permalink
refactor DeviceDiscoveryDialog class
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberRoute committed Sep 28, 2024
1 parent 7451b6d commit 7ffd997
Showing 1 changed file with 105 additions and 88 deletions.
193 changes: 105 additions & 88 deletions core/arp_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,142 +57,159 @@ def __init__(self, interface, oui_url, parent=None):

self._ui = Ui_DeviceDiscovery()
self._ui.setupUi(self)

# Initialize the UI and connection setup
self.setup_ui_elements()

# Threadpool for multi-threading
self.threadpool = QThreadPool()
print(f"Multithreading with maximum {self.threadpool.maxThreadCount()} threads")

# Initialize scanner and device info storage
self.scanner_timer = None
self.device_info = {} # Store dynamic device info here

def setup_ui_elements(self):
"""Sets up the UI elements and connections."""
self.setWindowIcon(QIcon("images/phantom_logo.png"))

self._ui.scan.clicked.connect(self.toggle_scan)
net.enable_ip_forwarding()
self._ui.quit.clicked.connect(self.quit_application)
self.interface_label = QLabel(f"Interface: {self.interface}")
self.interface_label.setStyleSheet("color: black")
self.os_label = QLabel(f"OS: {get_os()}")
self.os_label.setStyleSheet("color: black")

self._ui.verticalLayout.addWidget(self.os_label)
self._ui.verticalLayout.addWidget(self.interface_label)
self._ui.list.itemClicked.connect(self.open_device_details)
self.ip_address = None
self.mac = None
self.hostname = None
self.vendor = None

self.threadpool = QThreadPool()
print(f"Multithreading with maximum {self.threadpool.maxThreadCount()} threads")
# Add static labels and list widgets
self.add_static_ui_labels()
self.add_list_widget_to_tab_1()

self._ui.scan.setEnabled(True)
# Set the default font for the list items
self.setup_font_for_list_widgets()

self.resize(800, 600)

font = QFont()
font.setPointSize(12)
self._ui.list.setFont(font)
self._ui.listpkt.setFont(font)
def add_static_ui_labels(self):
"""Adds static labels like interface and OS information."""
interface_label = QLabel(f"Interface: {self.interface}")
interface_label.setStyleSheet("color: black")

self.add_list_widget_to_tab_1()
os_label = QLabel(f"OS: {get_os()}")
os_label.setStyleSheet("color: black")

self._ui.verticalLayout.addWidget(os_label)
self._ui.verticalLayout.addWidget(interface_label)

def add_list_widget_to_tab_1(self):
"""Adds a QListWidget to the first tab of the UI."""
self.list_widget_tab7 = QListWidget()
self.list_widget_tab7 = QListWidget() # pylint: disable=attribute-defined-outside-init
tab7_layout = QVBoxLayout(self._ui.tab_1)
tab7_layout.addWidget(self.list_widget_tab7)

# description_item_1 = QListWidgetItem("Devices detected in tab 7")
# description_item_1.setBackground(QColor(Qt.darkGray))
# description_item_1.setForeground(QColor(Qt.white))

# description_item_2 = QListWidgetItem("ARP packets in tab 7")
# description_item_2.setBackground(QColor(Qt.darkGray))
# description_item_2.setForeground(QColor(Qt.white))

# self.list_widget_tab7.addItem(description_item_1)
# self.list_widget_tab7.addItem(description_item_2)

def add_packet_to_list(self, packet_summary):
"""Adds a packet summary to the list widget in the first tab."""
packet_item = QListWidgetItem(packet_summary)
packet_item.setBackground(QColor(Qt.black))
packet_item.setForeground(QColor(Qt.white))
self.list_widget_tab7.addItem(packet_item)
def setup_font_for_list_widgets(self):
"""Sets up a uniform font for list widgets."""
font = QFont()
font.setPointSize(12)
self._ui.list.setFont(font)
self._ui.listpkt.setFont(font)
self._ui.tab_1.setFont(font)

@Slot(QListWidgetItem)
def open_device_details(self, item):
"""click on device open another window with details"""
selected_text = item.text()
parts = selected_text.split()

if len(parts) >= 4:
self.ip_address = parts[0]
self.mac = parts[1]
self.hostname = parts[2]
self.vendor = " ".join(parts[3:])

self.device_details_window = DeviceDetailsWindow( # pylint: disable=attribute-defined-outside-init
self.ip_address,
self.mac,
self.hostname,
self.vendor
)
self.device_details_window.show()
"""Click on a device opens another window with details."""
self.device_info = self.parse_device_details(item.text())
if self.device_info:
self.show_device_details_window()
else:
print("Invalid format: Not enough information")

def parse_device_details(self, text):
"""Parses device details from the selected list item text."""
parts = text.split()
if len(parts) >= 4:
return {
"ip_address": parts[0],
"mac": parts[1],
"hostname": parts[2],
"vendor": " ".join(parts[3:])
}
return None

def show_device_details_window(self):
"""Opens a window showing detailed device information."""
self.device_details_window = DeviceDetailsWindow( # pylint: disable=attribute-defined-outside-init
self.device_info['ip_address'],
self.device_info['mac'],
self.device_info['hostname'],
self.device_info['vendor']
)
self.device_details_window.show()

@Slot()
def toggle_scan(self):
"""keep scannning all local network every 1s"""
"""Toggle scanning all local networks every 1s."""
self._ui.scan.setEnabled(False)
self.timer_arp = QTimer(self) # pylint: disable=attribute-defined-outside-init
self.timer_arp.setInterval(1000)
self.timer_arp.timeout.connect(self.start_scan)
self.timer_arp.start()
self.scanner_timer = self.setup_scanner_timer()
self.scanner_timer.start()

def setup_scanner_timer(self):
"""Sets up the scanner timer to periodically trigger ARP scanning."""
timer_arp = QTimer(self)
timer_arp.setInterval(1000)
timer_arp.timeout.connect(self.start_scan)
return timer_arp

@Slot()
def start_scan(self):
"""start scanning"""
"""Starts scanning the network."""
self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup) # pylint: disable=attribute-defined-outside-init
self.arp_scanner_thread.finished.connect(self.handle_scan_results)
self.arp_scanner_thread.verbose_output.connect(self.update_tab7_verbose_output)
self.arp_scanner_thread.start()

@Slot(list)
def handle_scan_results(self, results):
"""update scan results"""
"""Updates the scan results."""
for ip_address, mac, hostname, device_vendor, packet in results:
label = f"{ip_address} {mac} {hostname}, {device_vendor}"
items = self._ui.list.findItems(label, Qt.MatchExactly)
if not items:
item = QListWidgetItem(label)
item.setBackground(QColor(Qt.black))
item.setForeground(QColor(Qt.white))
self._ui.list.addItem(item)
label = str(packet)
items = self._ui.listpkt.findItems(label, Qt.MatchExactly)
if not items:
item = QListWidgetItem(label)
item.setBackground(QColor(Qt.black))
item.setForeground(QColor(Qt.white))
self._ui.listpkt.addItem(item)
# Add device to the list if not already present
self.add_device_to_list(ip_address, mac, hostname, device_vendor)

# Add packet to the packet list if not already present
packet_label = str(packet)
self.add_packet_if_new(packet_label)

self._ui.scan.setEnabled(True)

def add_device_to_list(self, ip_address, mac, hostname, device_vendor):
"""Adds a device to the list widget in the UI."""
label = f"{ip_address} {mac} {hostname}, {device_vendor}"
if not self._ui.list.findItems(label, Qt.MatchExactly):
item = QListWidgetItem(label)
item.setBackground(QColor(Qt.black))
item.setForeground(QColor(Qt.white))
self._ui.list.addItem(item)

def add_packet_if_new(self, packet_label):
"""Adds a packet to the listpkt widget if it doesn't already exist."""
existing_items = self._ui.listpkt.findItems(packet_label, Qt.MatchExactly)
if not existing_items: # Add only if the packet is not already listed
packet_item = QListWidgetItem(packet_label)
packet_item.setBackground(QColor(Qt.black))
packet_item.setForeground(QColor(Qt.white))
self._ui.listpkt.addItem(packet_item)

@Slot(str)
def update_tab7_verbose_output(self, verbose_output):
"""update tab7"""
# Update the list_widget_tab7 with verbose output
font = QFont()
font.setPointSize(12)
self._ui.tab_1.setFont(font)

"""Updates the list_widget_tab7 with verbose output."""
for line in verbose_output.splitlines():
if line.strip(): # Skip empty lines
item = QListWidgetItem(line)
item.setBackground(QColor(Qt.black))
item.setForeground(QColor(Qt.white))
self.list_widget_tab7.addItem(item)
packet_item = QListWidgetItem(line)
packet_item.setBackground(QColor(Qt.black))
packet_item.setForeground(QColor(Qt.white))
self.list_widget_tab7.addItem(packet_item)

def quit_application(self):
"""quit the app"""
"""Quit the application."""
self._ui.quit.setEnabled(False)
net.disable_ip_forwarding()

# Stop any running threads safely
if hasattr(self, 'arp_scanner_thread') and self.arp_scanner_thread.isRunning():
self.arp_scanner_thread.terminate()
self.arp_scanner_thread.wait()
Expand Down

0 comments on commit 7ffd997

Please sign in to comment.