Skip to content

Commit

Permalink
removing a bunch of shitty gpt code :D
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberRoute committed Sep 28, 2024
1 parent 90e1e6f commit 88eabe1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 97 deletions.
99 changes: 2 additions & 97 deletions core/arp_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
QListWidgetItem
)
from PySide6.QtGui import QIcon, QFont, QColor # pylint: disable=E0611
from PySide6.QtCore import QRunnable, Slot, Qt, QThreadPool, QTimer # pylint: disable=E0611
from PySide6.QtCore import Slot, Qt, QThreadPool, QTimer # pylint: disable=E0611
from PyQt6.QtCore import QThread, pyqtSignal # pylint: disable=E0611
from ui.ui_arpscan import Ui_DeviceDiscovery
from core import sniffer
from core import vendor
from core.platform import get_os
import core.networking as net
Expand Down Expand Up @@ -50,62 +49,6 @@ def __init__(self, ip_address, mac_address, hostname, device_vendor):
self.setCentralWidget(central_widget)


class Worker(QRunnable): # pylint: disable=too-few-public-methods
"""
A worker thread for starting the packet collector.
Attributes:
packet_collector: The object responsible for capturing network packets.
"""

def __init__(self, packet_collector):
"""
Initializes the Worker with the packet collector.
Args:
packet_collector: The packet collector object responsible for network packet capture.
"""
super().__init__()
self.packet_collector = packet_collector

@Slot()
def run(self):
"""
Runs the packet collector to start capturing packets.
"""
print("Sniffer Thread start")
self.packet_collector.start_capture()
print("Sniffer Thread complete")


class StopWorker(QRunnable): # pylint: disable=too-few-public-methods
"""
A worker thread to stop the packet collector.
Attributes:
packet_collector: The object responsible for capturing network packets.
"""

def __init__(self, packet_collector):
"""
Initializes the StopWorker with the packet collector.
Args:
packet_collector: The packet collector object responsible for network packet capture.
"""
super().__init__()
self.packet_collector = packet_collector

@Slot()
def run(self):
"""
Stops the packet collector.
"""
print("Stopping Sniffer Thread")
self.packet_collector.stop_capture()
print("Stopped Sniffer Thread")


class DeviceDiscoveryDialog(QDialog): # pylint: disable=too-many-instance-attributes
"""Device Discovery"""
def __init__(self, interface, oui_url, parent=None):
Expand Down Expand Up @@ -147,11 +90,6 @@ def __init__(self, interface, oui_url, parent=None):

self.add_list_widget_to_tab_1()

self.packet_collector = sniffer.PacketCollector(self.interface, net.get_ip_address())
self.packet_collector.packetCaptured.connect(self.add_packet_to_list)

# self.progress_label = QLabel("Progress: 0%")
# self._ui.verticalLayout.addWidget(self.progress_label)

def add_list_widget_to_tab_1(self):
"""Adds a QListWidget to the first tab of the UI."""
Expand Down Expand Up @@ -213,7 +151,6 @@ def start_scan(self):
"""start scanning"""
self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup) # pylint: disable=attribute-defined-outside-init
self.arp_scanner_thread.finished.connect(self.handle_scan_results)
self.arp_scanner_thread.progress_updated.connect(self.update_progress)
self.arp_scanner_thread.verbose_output.connect(self.update_tab7_verbose_output)
self.arp_scanner_thread.start()

Expand All @@ -237,11 +174,6 @@ def handle_scan_results(self, results):
self._ui.listpkt.addItem(item)
self._ui.scan.setEnabled(True)

@Slot(int)
def update_progress(self, value):
"""update progress unused"""
self.progress_label.setText(f"Progress: {value}%")

@Slot(str)
def update_tab7_verbose_output(self, verbose_output):
"""update tab7"""
Expand All @@ -265,34 +197,10 @@ def quit_application(self):
if hasattr(self, 'arp_scanner_thread') and self.arp_scanner_thread.isRunning():
self.arp_scanner_thread.terminate()
self.arp_scanner_thread.wait()

stop_worker = StopWorker(self.packet_collector)
self.threadpool.start(stop_worker)
if hasattr(self, 'arp_scanner_thread') and self.arp_scanner_thread.isRunning():
self.arp_scanner_thread.terminate()
self.arp_scanner_thread.wait()

QTimer.singleShot(2000, self.close)

class ARPScanner:
"""Arp Scanner"""
@staticmethod
def calculate_network_cidr(ip_address, subnet_mask):
"""calculate network cidr"""
# Split the IP address and subnet mask into octets
ip_octets = [int(octet) for octet in ip_address.split('.')]
subnet_octets = [int(octet) for octet in subnet_mask.split('.')]

# Perform bitwise AND operation on corresponding octets
network_octets = [ip_octets[i] & subnet_octets[i] for i in range(4)]

# Calculate the number of set bits in the subnet mask
prefix_length = sum(bin(octet).count('1') for octet in subnet_octets)

# Format the network address in CIDR notation
network_address = '.'.join(map(str, network_octets)) + '/' + str(prefix_length)

return network_address

@staticmethod
def get_hostname(ip_address):
Expand Down Expand Up @@ -322,7 +230,7 @@ def run(self):
ip_address = get_if_addr(self.interface)
try:
netmask = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['netmask']
network = ARPScanner.calculate_network_cidr(ip_address, netmask)
network = net.calculate_network_cidr(ip_address, netmask)
except KeyError:
self.finished.emit([])
return
Expand Down Expand Up @@ -353,7 +261,4 @@ def run(self):
device_vendor = self.mac_vendor_lookup.lookup_vendor(mac)
hostname = ARPScanner.get_hostname(ip_address)
arp_results.append((ip_address, mac, hostname, device_vendor, packet[1][ARP]))

# progress = int((i + 1) / len(arp_packets) * 100)
# self.progress_updated.emit(progress)
self.finished.emit(arp_results)
19 changes: 19 additions & 0 deletions core/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def disable_ip_forwarding():

assert subprocess.call(cmd) == 0


def get_ip_address():
"""Get the IP address of the current machine."""
try:
Expand All @@ -58,3 +59,21 @@ def get_ip_address():
except OSError as e:
print(f"Error while getting IP address: {e}")
return None


def calculate_network_cidr(ip_address, subnet_mask):
"""calculate network cidr"""
# Split the IP address and subnet mask into octets
ip_octets = [int(octet) for octet in ip_address.split('.')]
subnet_octets = [int(octet) for octet in subnet_mask.split('.')]

# Perform bitwise AND operation on corresponding octets
network_octets = [ip_octets[i] & subnet_octets[i] for i in range(4)]

# Calculate the number of set bits in the subnet mask
prefix_length = sum(bin(octet).count('1') for octet in subnet_octets)

# Format the network address in CIDR notation
network_address = '.'.join(map(str, network_octets)) + '/' + str(prefix_length)

return network_address

0 comments on commit 88eabe1

Please sign in to comment.