diff --git a/.pylintrc b/.pylintrc index 9f97a58..3cd3421 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,3 @@ [MASTER] init-hook='import sys; sys.path.append(".")' +extension-pkg-allow-list=netifaces \ No newline at end of file diff --git a/core/arp_scanner.py b/core/arp_scanner.py index eb8b007..dc64af2 100644 --- a/core/arp_scanner.py +++ b/core/arp_scanner.py @@ -1,11 +1,11 @@ """ -Arp Scanner +Module Arp Scanner """ import io import sys import socket import netifaces -import scapy.all as scapy +from scapy.all import arping, ARP, get_if_addr # pylint: disable=E0611 from PySide6.QtWidgets import ( # pylint: disable=E0611 QMainWindow, QVBoxLayout, @@ -50,42 +50,64 @@ def __init__(self, ip_address, mac_address, hostname, device_vendor): self.setCentralWidget(central_widget) -class Worker(QRunnable): +class Worker(QRunnable): # pylint: disable=too-few-public-methods """ - Worker thread + A worker thread for starting the packet collector. + + Attributes: + packet_collector: The object responsible for capturing network packets. """ def __init__(self, packet_collector): + """ + Initializes the Worker with the packet collector. + + Args: + packet_collector: The packet collector object responsible for network packet capture. + """ super().__init__() self.packet_collector = packet_collector @Slot() def run(self): + """ + Runs the packet collector to start capturing packets. + """ print("Sniffer Thread start") self.packet_collector.start_capture() print("Sniffer Thread complete") -class StopWorker(QRunnable): +class StopWorker(QRunnable): # pylint: disable=too-few-public-methods """ - Worker thread to stop the packet collector + A worker thread to stop the packet collector. + + Attributes: + packet_collector: The object responsible for capturing network packets. """ def __init__(self, packet_collector): + """ + Initializes the StopWorker with the packet collector. + + Args: + packet_collector: The packet collector object responsible for network packet capture. + """ super().__init__() self.packet_collector = packet_collector @Slot() def run(self): """ - Code to stop the packet collector + Stops the packet collector. """ print("Stopping Sniffer Thread") self.packet_collector.stop_capture() print("Stopped Sniffer Thread") -class DeviceDiscoveryDialog(QDialog): +class DeviceDiscoveryDialog(QDialog): # pylint: disable=too-many-instance-attributes + """Device Discovery""" def __init__(self, interface, oui_url, parent=None): super().__init__(parent) self.interface = interface @@ -112,7 +134,7 @@ def __init__(self, interface, oui_url, parent=None): self.vendor = None self.threadpool = QThreadPool() - print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount()) + print(f"Multithreading with maximum {self.threadpool.maxThreadCount()} threads") self._ui.scan.setEnabled(True) @@ -132,6 +154,7 @@ def __init__(self, interface, oui_url, parent=None): # self._ui.verticalLayout.addWidget(self.progress_label) def add_list_widget_to_tab_1(self): + """Adds a QListWidget to the first tab of the UI.""" self.list_widget_tab7 = QListWidget() tab7_layout = QVBoxLayout(self._ui.tab_1) tab7_layout.addWidget(self.list_widget_tab7) @@ -148,6 +171,7 @@ def add_list_widget_to_tab_1(self): # self.list_widget_tab7.addItem(description_item_2) def add_packet_to_list(self, packet_summary): + """Adds a packet summary to the list widget in the first tab.""" packet_item = QListWidgetItem(packet_summary) packet_item.setBackground(QColor(Qt.black)) packet_item.setForeground(QColor(Qt.white)) @@ -155,8 +179,8 @@ def add_packet_to_list(self, packet_summary): @Slot(QListWidgetItem) def open_device_details(self, item): + """click on device open another window with details""" selected_text = item.text() - parts = selected_text.split() if len(parts) >= 4: @@ -165,7 +189,7 @@ def open_device_details(self, item): self.hostname = parts[2] self.vendor = " ".join(parts[3:]) - self.device_details_window = DeviceDetailsWindow( + self.device_details_window = DeviceDetailsWindow( # pylint: disable=attribute-defined-outside-init self.ip_address, self.mac, self.hostname, @@ -177,15 +201,17 @@ def open_device_details(self, item): @Slot() def toggle_scan(self): + """keep scannning all local network every 1s""" self._ui.scan.setEnabled(False) - self.timer_arp = QTimer(self) + self.timer_arp = QTimer(self) # pylint: disable=attribute-defined-outside-init self.timer_arp.setInterval(1000) self.timer_arp.timeout.connect(self.start_scan) self.timer_arp.start() @Slot() def start_scan(self): - self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup) + """start scanning""" + self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup) # pylint: disable=attribute-defined-outside-init self.arp_scanner_thread.finished.connect(self.handle_scan_results) self.arp_scanner_thread.progress_updated.connect(self.update_progress) self.arp_scanner_thread.verbose_output.connect(self.update_tab7_verbose_output) @@ -193,6 +219,7 @@ def start_scan(self): @Slot(list) def handle_scan_results(self, results): + """update scan results""" for ip_address, mac, hostname, device_vendor, packet in results: label = f"{ip_address} {mac} {hostname}, {device_vendor}" items = self._ui.list.findItems(label, Qt.MatchExactly) @@ -212,10 +239,12 @@ def handle_scan_results(self, results): @Slot(int) def update_progress(self, value): + """update progress unused""" self.progress_label.setText(f"Progress: {value}%") @Slot(str) def update_tab7_verbose_output(self, verbose_output): + """update tab7""" # Update the list_widget_tab7 with verbose output font = QFont() font.setPointSize(12) @@ -229,6 +258,7 @@ def update_tab7_verbose_output(self, verbose_output): self.list_widget_tab7.addItem(item) def quit_application(self): + """quit the app""" self._ui.quit.setEnabled(False) net.disable_ip_forwarding() @@ -245,8 +275,10 @@ def quit_application(self): QTimer.singleShot(2000, self.close) class ARPScanner: + """Arp Scanner""" @staticmethod def calculate_network_cidr(ip_address, subnet_mask): + """calculate network cidr""" # Split the IP address and subnet mask into octets ip_octets = [int(octet) for octet in ip_address.split('.')] subnet_octets = [int(octet) for octet in subnet_mask.split('.')] @@ -264,13 +296,17 @@ def calculate_network_cidr(ip_address, subnet_mask): @staticmethod def get_hostname(ip_address): + "get hostname" try: hostname = socket.gethostbyaddr(ip_address)[0] return hostname - except Exception as e: - return "N/A" # Return "N/A" if hostname retrieval fails + except socket.herror: + return "N/A" + except socket.gaierror: + return "N/A" -class ARPScannerThread(QThread): +class ARPScannerThread(QThread): # pylint: disable=too-few-public-methods + """Executing arp scan in separate thread""" finished = pyqtSignal(list) progress_updated = pyqtSignal(int) verbose_output = pyqtSignal(str) # Signal to emit verbose output @@ -282,7 +318,8 @@ def __init__(self, interface, mac_vendor_lookup, timeout=1): self.timeout = timeout def run(self): - ip_address = scapy.get_if_addr(self.interface) + "run the scan" + ip_address = get_if_addr(self.interface) try: netmask = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['netmask'] network = ARPScanner.calculate_network_cidr(ip_address, netmask) @@ -296,13 +333,13 @@ def run(self): # Redirect stdout to capture Scapy output sys.stdout = io.StringIO() - arp_packets = scapy.arping(network, timeout=self.timeout, verbose=1)[0] + arp_packets = arping(network, timeout=self.timeout, verbose=1)[0] # Get the verbose output verbose_output = sys.stdout.getvalue() self.verbose_output.emit(verbose_output) # Emit the verbose output - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught print(f"Error during ARP scan: {e}") self.finished.emit([]) return @@ -310,12 +347,12 @@ def run(self): sys.stdout = original_stdout # Reset the standard output to its original state for _, packet in enumerate(arp_packets): - if packet[1].haslayer(scapy.ARP): - ip_address = packet[1][scapy.ARP].psrc - mac = packet[1][scapy.ARP].hwsrc - vendor = self.mac_vendor_lookup.lookup_vendor(mac) + if packet[1].haslayer(ARP): + ip_address = packet[1][ARP].psrc + mac = packet[1][ARP].hwsrc + device_vendor = self.mac_vendor_lookup.lookup_vendor(mac) hostname = ARPScanner.get_hostname(ip_address) - arp_results.append((ip_address, mac, hostname, vendor, packet[1][scapy.ARP])) + arp_results.append((ip_address, mac, hostname, device_vendor, packet[1][ARP])) # progress = int((i + 1) / len(arp_packets) * 100) # self.progress_updated.emit(progress) diff --git a/core/platform.py b/core/platform.py index dbcce68..bb14682 100644 --- a/core/platform.py +++ b/core/platform.py @@ -1,3 +1,5 @@ +"""Module Platform""" + import sys @@ -15,4 +17,4 @@ def get_os(): if os_platform.startswith('win'): return 'windows' - raise RuntimeError('Unsupported operating system.') \ No newline at end of file + raise RuntimeError('Unsupported operating system.') diff --git a/core/sniffer.py b/core/sniffer.py index f9bf00f..74a2c8b 100644 --- a/core/sniffer.py +++ b/core/sniffer.py @@ -1,17 +1,34 @@ -from PyQt6.QtCore import QObject, pyqtSignal as Signal +"""Module Sniffer""" + +from PyQt6.QtCore import QObject, pyqtSignal as Signal # pylint: disable=E0611 import scapy.all as scapy class PacketCollector(QObject): + """ + Class responsible for capturing network packets using Scapy. + Emits a signal when a packet is captured. + """ packetCaptured = Signal(str) def __init__(self, iface, ip_addr, parent=None): + """ + Initializes the PacketCollector. + + :param iface: Network interface to sniff on. + :param ip_addr: IP address to exclude from capture. + :param parent: Optional parent for QObject. + """ super().__init__(parent) self.iface = iface self.ip_addr = ip_addr self.running = False def start_capture(self): + """ + Starts capturing packets on the specified network interface. + The capture filters out ARP packets and packets originating from the specified IP address. + """ self.running = True scapy.sniff( iface=self.iface, @@ -21,12 +38,27 @@ def start_capture(self): store=False ) - def _stop_filter(self, packet): + def _stop_filter(self): + """ + Stop filter for the sniffing process. + Capture stops when 'running' is set to False. + + :param _: Placeholder for the packet argument (unused). + :return: Boolean indicating whether to stop the capture. + """ return not self.running def process_packet(self, packet): + """ + Processes a captured packet, emitting its summary as a signal. + + :param packet: The packet to process. + """ packet_summary = str(packet.summary()) self.packetCaptured.emit(packet_summary) def stop_capture(self): + """ + Stops capturing packets by setting the 'running' flag to False. + """ self.running = False