Skip to content

Commit

Permalink
code is still messy, but at least I should get a build
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberRoute committed Sep 27, 2024
1 parent e8dd166 commit 8a2dc95
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 27 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[MASTER]
init-hook='import sys; sys.path.append(".")'
extension-pkg-allow-list=netifaces
85 changes: 61 additions & 24 deletions core/arp_scanner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""
Arp Scanner
Module Arp Scanner
"""
import io
import sys
import socket
import netifaces
import scapy.all as scapy
from scapy.all import arping, ARP, get_if_addr # pylint: disable=E0611
from PySide6.QtWidgets import ( # pylint: disable=E0611
QMainWindow,
QVBoxLayout,
Expand Down Expand Up @@ -50,42 +50,64 @@ def __init__(self, ip_address, mac_address, hostname, device_vendor):
self.setCentralWidget(central_widget)


class Worker(QRunnable):
class Worker(QRunnable): # pylint: disable=too-few-public-methods
"""
Worker thread
A worker thread for starting the packet collector.
Attributes:
packet_collector: The object responsible for capturing network packets.
"""

def __init__(self, packet_collector):
"""
Initializes the Worker with the packet collector.
Args:
packet_collector: The packet collector object responsible for network packet capture.
"""
super().__init__()
self.packet_collector = packet_collector

@Slot()
def run(self):
"""
Runs the packet collector to start capturing packets.
"""
print("Sniffer Thread start")
self.packet_collector.start_capture()
print("Sniffer Thread complete")


class StopWorker(QRunnable):
class StopWorker(QRunnable): # pylint: disable=too-few-public-methods
"""
Worker thread to stop the packet collector
A worker thread to stop the packet collector.
Attributes:
packet_collector: The object responsible for capturing network packets.
"""

def __init__(self, packet_collector):
"""
Initializes the StopWorker with the packet collector.
Args:
packet_collector: The packet collector object responsible for network packet capture.
"""
super().__init__()
self.packet_collector = packet_collector

@Slot()
def run(self):
"""
Code to stop the packet collector
Stops the packet collector.
"""
print("Stopping Sniffer Thread")
self.packet_collector.stop_capture()
print("Stopped Sniffer Thread")


class DeviceDiscoveryDialog(QDialog):
class DeviceDiscoveryDialog(QDialog): # pylint: disable=too-many-instance-attributes
"""Device Discovery"""
def __init__(self, interface, oui_url, parent=None):
super().__init__(parent)
self.interface = interface
Expand All @@ -112,7 +134,7 @@ def __init__(self, interface, oui_url, parent=None):
self.vendor = None

self.threadpool = QThreadPool()
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
print(f"Multithreading with maximum {self.threadpool.maxThreadCount()} threads")

self._ui.scan.setEnabled(True)

Expand All @@ -132,6 +154,7 @@ def __init__(self, interface, oui_url, parent=None):
# self._ui.verticalLayout.addWidget(self.progress_label)

def add_list_widget_to_tab_1(self):
"""Adds a QListWidget to the first tab of the UI."""
self.list_widget_tab7 = QListWidget()
tab7_layout = QVBoxLayout(self._ui.tab_1)
tab7_layout.addWidget(self.list_widget_tab7)
Expand All @@ -148,15 +171,16 @@ def add_list_widget_to_tab_1(self):
# self.list_widget_tab7.addItem(description_item_2)

def add_packet_to_list(self, packet_summary):
"""Adds a packet summary to the list widget in the first tab."""
packet_item = QListWidgetItem(packet_summary)
packet_item.setBackground(QColor(Qt.black))
packet_item.setForeground(QColor(Qt.white))
self.list_widget_tab7.addItem(packet_item)

@Slot(QListWidgetItem)
def open_device_details(self, item):
"""click on device open another window with details"""
selected_text = item.text()

parts = selected_text.split()

if len(parts) >= 4:
Expand All @@ -165,7 +189,7 @@ def open_device_details(self, item):
self.hostname = parts[2]
self.vendor = " ".join(parts[3:])

self.device_details_window = DeviceDetailsWindow(
self.device_details_window = DeviceDetailsWindow( # pylint: disable=attribute-defined-outside-init
self.ip_address,
self.mac,
self.hostname,
Expand All @@ -177,22 +201,25 @@ def open_device_details(self, item):

@Slot()
def toggle_scan(self):
"""keep scannning all local network every 1s"""
self._ui.scan.setEnabled(False)
self.timer_arp = QTimer(self)
self.timer_arp = QTimer(self) # pylint: disable=attribute-defined-outside-init
self.timer_arp.setInterval(1000)
self.timer_arp.timeout.connect(self.start_scan)
self.timer_arp.start()

@Slot()
def start_scan(self):
self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup)
"""start scanning"""
self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup) # pylint: disable=attribute-defined-outside-init
self.arp_scanner_thread.finished.connect(self.handle_scan_results)
self.arp_scanner_thread.progress_updated.connect(self.update_progress)
self.arp_scanner_thread.verbose_output.connect(self.update_tab7_verbose_output)
self.arp_scanner_thread.start()

@Slot(list)
def handle_scan_results(self, results):
"""update scan results"""
for ip_address, mac, hostname, device_vendor, packet in results:
label = f"{ip_address} {mac} {hostname}, {device_vendor}"
items = self._ui.list.findItems(label, Qt.MatchExactly)
Expand All @@ -212,10 +239,12 @@ def handle_scan_results(self, results):

@Slot(int)
def update_progress(self, value):
"""update progress unused"""
self.progress_label.setText(f"Progress: {value}%")

@Slot(str)
def update_tab7_verbose_output(self, verbose_output):
"""update tab7"""
# Update the list_widget_tab7 with verbose output
font = QFont()
font.setPointSize(12)
Expand All @@ -229,6 +258,7 @@ def update_tab7_verbose_output(self, verbose_output):
self.list_widget_tab7.addItem(item)

def quit_application(self):
"""quit the app"""
self._ui.quit.setEnabled(False)
net.disable_ip_forwarding()

Expand All @@ -245,8 +275,10 @@ def quit_application(self):
QTimer.singleShot(2000, self.close)

class ARPScanner:
"""Arp Scanner"""
@staticmethod
def calculate_network_cidr(ip_address, subnet_mask):
"""calculate network cidr"""
# Split the IP address and subnet mask into octets
ip_octets = [int(octet) for octet in ip_address.split('.')]
subnet_octets = [int(octet) for octet in subnet_mask.split('.')]
Expand All @@ -264,13 +296,17 @@ def calculate_network_cidr(ip_address, subnet_mask):

@staticmethod
def get_hostname(ip_address):
"get hostname"
try:
hostname = socket.gethostbyaddr(ip_address)[0]
return hostname
except Exception as e:
return "N/A" # Return "N/A" if hostname retrieval fails
except socket.herror:
return "N/A"
except socket.gaierror:
return "N/A"

class ARPScannerThread(QThread):
class ARPScannerThread(QThread): # pylint: disable=too-few-public-methods
"""Executing arp scan in separate thread"""
finished = pyqtSignal(list)
progress_updated = pyqtSignal(int)
verbose_output = pyqtSignal(str) # Signal to emit verbose output
Expand All @@ -282,7 +318,8 @@ def __init__(self, interface, mac_vendor_lookup, timeout=1):
self.timeout = timeout

def run(self):
ip_address = scapy.get_if_addr(self.interface)
"run the scan"
ip_address = get_if_addr(self.interface)
try:
netmask = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['netmask']
network = ARPScanner.calculate_network_cidr(ip_address, netmask)
Expand All @@ -296,26 +333,26 @@ def run(self):
# Redirect stdout to capture Scapy output
sys.stdout = io.StringIO()

arp_packets = scapy.arping(network, timeout=self.timeout, verbose=1)[0]
arp_packets = arping(network, timeout=self.timeout, verbose=1)[0]

# Get the verbose output
verbose_output = sys.stdout.getvalue()
self.verbose_output.emit(verbose_output) # Emit the verbose output

except Exception as e:
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error during ARP scan: {e}")
self.finished.emit([])
return
finally:
sys.stdout = original_stdout # Reset the standard output to its original state

for _, packet in enumerate(arp_packets):
if packet[1].haslayer(scapy.ARP):
ip_address = packet[1][scapy.ARP].psrc
mac = packet[1][scapy.ARP].hwsrc
vendor = self.mac_vendor_lookup.lookup_vendor(mac)
if packet[1].haslayer(ARP):
ip_address = packet[1][ARP].psrc
mac = packet[1][ARP].hwsrc
device_vendor = self.mac_vendor_lookup.lookup_vendor(mac)
hostname = ARPScanner.get_hostname(ip_address)
arp_results.append((ip_address, mac, hostname, vendor, packet[1][scapy.ARP]))
arp_results.append((ip_address, mac, hostname, device_vendor, packet[1][ARP]))

# progress = int((i + 1) / len(arp_packets) * 100)
# self.progress_updated.emit(progress)
Expand Down
4 changes: 3 additions & 1 deletion core/platform.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Module Platform"""

import sys


Expand All @@ -15,4 +17,4 @@ def get_os():
if os_platform.startswith('win'):
return 'windows'

raise RuntimeError('Unsupported operating system.')
raise RuntimeError('Unsupported operating system.')
36 changes: 34 additions & 2 deletions core/sniffer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
from PyQt6.QtCore import QObject, pyqtSignal as Signal
"""Module Sniffer"""

from PyQt6.QtCore import QObject, pyqtSignal as Signal # pylint: disable=E0611
import scapy.all as scapy


class PacketCollector(QObject):
"""
Class responsible for capturing network packets using Scapy.
Emits a signal when a packet is captured.
"""
packetCaptured = Signal(str)

def __init__(self, iface, ip_addr, parent=None):
"""
Initializes the PacketCollector.
:param iface: Network interface to sniff on.
:param ip_addr: IP address to exclude from capture.
:param parent: Optional parent for QObject.
"""
super().__init__(parent)
self.iface = iface
self.ip_addr = ip_addr
self.running = False

def start_capture(self):
"""
Starts capturing packets on the specified network interface.
The capture filters out ARP packets and packets originating from the specified IP address.
"""
self.running = True
scapy.sniff(
iface=self.iface,
Expand All @@ -21,12 +38,27 @@ def start_capture(self):
store=False
)

def _stop_filter(self, packet):
def _stop_filter(self):
"""
Stop filter for the sniffing process.
Capture stops when 'running' is set to False.
:param _: Placeholder for the packet argument (unused).
:return: Boolean indicating whether to stop the capture.
"""
return not self.running

def process_packet(self, packet):
"""
Processes a captured packet, emitting its summary as a signal.
:param packet: The packet to process.
"""
packet_summary = str(packet.summary())
self.packetCaptured.emit(packet_summary)

def stop_capture(self):
"""
Stops capturing packets by setting the 'running' flag to False.
"""
self.running = False

0 comments on commit 8a2dc95

Please sign in to comment.