Skip to content

Commit

Permalink
Type hint arch/bpf
Browse files Browse the repository at this point in the history
  • Loading branch information
gpotter2 committed Nov 28, 2023
1 parent 5a2dbf0 commit 438f22d
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 68 deletions.
6 changes: 5 additions & 1 deletion .config/mypy/mypy_enabled.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ scapy/__main__.py
scapy/all.py
scapy/ansmachine.py
scapy/arch/__init__.py
scapy/arch/bpf/__init__.py
scapy/arch/bpf/consts.py
scapy/arch/bpf/core.py
scapy/arch/bpf/supersocket.py
scapy/arch/common.py
scapy/arch/libpcap.py
scapy/arch/linux.py
scapy/arch/unix.py
scapy/arch/solaris.py
scapy/arch/unix.py
scapy/arch/windows/__init__.py
scapy/arch/windows/native.py
scapy/arch/windows/structures.py
Expand Down
17 changes: 12 additions & 5 deletions scapy/arch/bpf/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from scapy.libs.structures import bpf_program
from scapy.data import MTU

# Type hints
from typing import (
Any,
Callable,
)

SIOCGIFFLAGS = 0xc0206911
BPF_BUFFER_LENGTH = MTU

Expand All @@ -23,19 +29,20 @@
IOC_IN = 0x80000000
IOC_INOUT = IOC_IN | IOC_OUT

_th = lambda x: x if isinstance(x, int) else ctypes.sizeof(x)
_th = lambda x: x if isinstance(x, int) else ctypes.sizeof(x) # type: Callable[[Any], int] # noqa: E501


def _IOC(inout, group, num, len):
# type: (int, str, int, Any) -> int
return (inout |
((_th(len) & IOCPARM_MASK) << 16) |
(ord(group) << 8) | (num))


_IO = lambda g, n: _IOC(IOC_VOID, g, n, 0)
_IOR = lambda g, n, t: _IOC(IOC_OUT, g, n, t)
_IOW = lambda g, n, t: _IOC(IOC_IN, g, n, t)
_IOWR = lambda g, n, t: _IOC(IOC_INOUT, g, n, t)
_IO = lambda g, n: _IOC(IOC_VOID, g, n, 0) # type: Callable[[str, int], int]
_IOR = lambda g, n, t: _IOC(IOC_OUT, g, n, t) # type: Callable[[str, int, Any], int]
_IOW = lambda g, n, t: _IOC(IOC_IN, g, n, t) # type: Callable[[str, int, Any], int]
_IOWR = lambda g, n, t: _IOC(IOC_INOUT, g, n, t) # type: Callable[[str, int, Any], int]

# Length of some structures
_bpf_stat = 8
Expand Down
30 changes: 24 additions & 6 deletions scapy/arch/bpf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@
InterfaceProvider,
NetworkInterface,
network_name,
_GlobInterfaceType,
)
from scapy.pton_ntop import inet_ntop

# Typing
from typing import (
Dict,
List,
Optional,
Tuple,
)

if LINUX:
raise OSError("BPF conflicts with Linux")

Expand Down Expand Up @@ -67,7 +76,10 @@ class if_nameindex(Structure):


def get_if_raw_addr(ifname):
"""Returns the IPv4 address configured on 'ifname', packed with inet_pton.""" # noqa: E501
# type: (_GlobInterfaceType) -> bytes
"""
Returns the IPv4 address configured on 'ifname', packed with inet_pton.
"""

ifname = network_name(ifname)

Expand Down Expand Up @@ -99,6 +111,7 @@ def get_if_raw_addr(ifname):


def get_if_raw_hwaddr(ifname):
# type: (_GlobInterfaceType) -> Tuple[int, bytes]
"""Returns the packed MAC address configured on 'ifname'."""

NULL_MAC_ADDRESS = b'\x00' * 6
Expand Down Expand Up @@ -128,19 +141,19 @@ def get_if_raw_hwaddr(ifname):
raise Scapy_Exception("No MAC address found on %s !" % ifname)

# Pack and return the MAC address
mac = addresses[0].split(' ')[1]
mac = [chr(int(b, 16)) for b in mac.split(':')]
mac = [int(b, 16) for b in addresses[0].split(' ')[1].split(':')]

# Check that the address length is correct
if len(mac) != 6:
raise Scapy_Exception("No MAC address found on %s !" % ifname)

return (ARPHDR_ETHER, ''.join(mac))
return (ARPHDR_ETHER, struct.pack("!HHHHHH", *mac))


# BPF specific functions

def get_dev_bpf():
# type: () -> Tuple[int, int]
"""Returns an opened BPF file object"""

# Get the first available BPF handle
Expand All @@ -160,6 +173,7 @@ def get_dev_bpf():


def attach_filter(fd, bpf_filter, iface):
# type: (int, str, _GlobInterfaceType) -> None
"""Attach a BPF filter to the BPF file descriptor"""
bp = compile_filter(bpf_filter, iface)
# Assign the BPF program to the interface
Expand All @@ -171,6 +185,7 @@ def attach_filter(fd, bpf_filter, iface):
# Interface manipulation functions

def _get_ifindex_list():
# type: () -> List[Tuple[str, int]]
"""
Returns a list containing (iface, index)
"""
Expand All @@ -189,6 +204,7 @@ def _get_ifindex_list():


def _get_if_flags(ifname):
# type: (_GlobInterfaceType) -> Optional[int]
"""Internal function to get interface flags"""
# Get interface flags
try:
Expand All @@ -206,6 +222,7 @@ class BPFInterfaceProvider(InterfaceProvider):
name = "BPF"

def _is_valid(self, dev):
# type: (NetworkInterface) -> bool
if not dev.flags & 0x1: # not IFF_UP
return False
# Get a BPF handle
Expand All @@ -228,17 +245,18 @@ def _is_valid(self, dev):
os.close(fd)

def load(self):
# type: () -> Dict[str, NetworkInterface]
from scapy.fields import FlagValue
data = {}
ips = in6_getifaddr()
for ifname, index in _get_ifindex_list():
try:
ifflags = _get_if_flags(ifname)
ifflags_int = _get_if_flags(ifname) or 0
mac = scapy.utils.str2mac(get_if_raw_hwaddr(ifname)[1])
ip = inet_ntop(socket.AF_INET, get_if_raw_addr(ifname))
except Scapy_Exception:
continue
ifflags = FlagValue(ifflags, _iff_flags)
ifflags = FlagValue(ifflags_int, _iff_flags)
if_data = {
"name": ifname,
"network_name": ifname,
Expand Down
Loading

0 comments on commit 438f22d

Please sign in to comment.