forked from kylemcdonald/FreeWifi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwifi-users.py
133 lines (111 loc) · 4.08 KB
/
wifi-users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from __future__ import print_function
import subprocess
import re
import sys
import argparse
import os
from collections import defaultdict
import netifaces
from netaddr import EUI, mac_unix_expanded
from wireless import Wireless
from tqdm import tqdm
NO_SSID = 'No SSID is currently available. Connect to the network first.'
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def run_process(cmd, err=False):
err_pipe = subprocess.STDOUT if err else open(os.devnull, 'w')
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=err_pipe)
while True:
retcode = p.poll()
line = p.stdout.readline()
yield line
if retcode is not None:
break
def main(args):
parser = argparse.ArgumentParser(
description='Find active users on the current wireless network.')
parser.add_argument('-p', '--packets',
default=1000,
type=int,
help='How many packets to capture.')
parser.add_argument('-r', '--results',
default=None,
type=int,
help='How many results to show.')
args = parser.parse_args()
try:
wireless = Wireless()
ssid = wireless.current()
if ssid is None:
eprint(NO_SSID)
return
eprint('SSID: {}'.format(ssid))
except:
eprint('Couldn\'t get current wireless SSID.')
raise
network_macs = set()
try:
gw = netifaces.gateways()['default'][netifaces.AF_INET]
iface = gw[1]
gw_arp = subprocess.check_output(['arp', '-n', str(gw[0])])
gw_arp = gw_arp.decode('utf-8')
gw_mac = EUI(re.search(' at (.+) on ', gw_arp).group(1))
gw_mac.dialect = mac_unix_expanded
network_macs.add(gw_mac)
eprint('Gateway: {}'.format(gw_mac))
except KeyError:
eprint('No gateway is available: {}'.format(netifaces.gateways()))
return
except:
eprint('Error getting gateway mac address.')
bssid_re = re.compile(' BSSID:(\S+) ')
mac_re = re.compile('(SA|DA|BSSID):(([\dA-F]{2}:){5}[\dA-F]{2})', re.I)
length_re = re.compile(' length (\d+)')
client_macs = set()
data_totals = defaultdict(int)
cmd = 'tcpdump -i {} -Ile -c {}'.format(iface, args.packets).split()
try:
bar_format = '{n_fmt}/{total_fmt} {bar} {remaining}'
for line in tqdm(run_process(cmd),
total=args.packets,
bar_format=bar_format):
line = line.decode('utf-8')
# find BSSID for SSID
if ssid in line:
bssid_matches = bssid_re.search(line)
if bssid_matches:
bssid = bssid_matches.group(1)
if 'Broadcast' not in bssid:
network_macs.add(EUI(bssid))
# count data packets
length_match = length_re.search(line)
if length_match:
length = int(length_match.group(1))
mac_matches = mac_re.findall(line)
if mac_matches:
macs = set([EUI(match[1]) for match in mac_matches])
leftover = macs - network_macs
if len(leftover) < len(macs):
for mac in leftover:
data_totals[mac] += length
client_macs.add(mac)
except subprocess.CalledProcessError:
eprint('Error collecting packets.')
raise
except KeyboardInterrupt:
pass
totals_sorted = sorted(data_totals.items(),
key=lambda x: x[1],
reverse=True)
eprint('Total of {} user(s)'.format(len(totals_sorted)))
for mac, total in reversed(totals_sorted[:args.results]):
mac.dialect = mac_unix_expanded
if total > 0:
print('{}\t{} bytes'.format(mac, total))
if __name__ == '__main__':
from sys import argv
try:
main(argv)
except KeyboardInterrupt:
pass
sys.exit()