forked from vmag/srv6_tracert
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsrv6_traceroute.py
106 lines (84 loc) · 4.29 KB
/
srv6_traceroute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3
from argparse import ArgumentParser
from prettytable import PrettyTable
import srv6_tracert
import yaml
import json, netaddr
import os, sys
def parse_config(filename):
with open(filename) as hosts_file:
return yaml.safe_load(hosts_file)
def parse_sids(filename):
with open(filename) as json_file:
data = json.load(json_file)
sidlist = []
for p in data['sids']:
if netaddr.valid_ipv6(p['ipv6']) is True:
sidlist.append(p['ipv6'])
else:
print('%s is invalid IPv6 address' %p['ipv6'])
sys.exit(1)
return sidlist
def print_results(results):
t = PrettyTable(['TTL', 'ASN', 'ICMP dst', 'SR dst', 'Latency'])
for ttl, res in results.items():
t.add_row([ttl, res["ASN"], res["ICMP"], res["SR"], res["latency"]])
print(t)
def traceroute(destination, sr_hops, packetsize, verbosity, timeout):
result_table = {}
print ("======= Starting ICMP (packet size: %i) traceroute to %s =======" % (packetsize,
destination))
max_ttl, result_table = srv6_tracert.icmp_traceroute(destination, result_table,
packetsize, verbosity, timeout)
print ("======= Starting SRv6 (packet size: %i) traceroute to %s =======" % (packetsize,
destination))
result_table = srv6_tracert.srv6_traceroute(destination, result_table, packetsize,
max_ttl, sr_hops, verbosity, timeout)
return result_table
def multiple_traceroute(destinations, sr_hops, packetsize, verbosity, timeout):
for destination in destinations:
print ("Performing traceroute on server %s (%s)" % (destination["name"],
destination["ipv6"]))
results = traceroute(destination["ipv6"], sr_hops, packetsize, verbosity, timeout)
print ("Results of traceroute to server %s (%s)" % (destination["name"],
destination["ipv6"]))
print_results(results)
def main():
parser = ArgumentParser(description="SRv6 traceroute script")
dest_group = parser.add_mutually_exclusive_group(required=True)
dest_group.add_argument('-d', '--destination', action="store", help="Destination host IPv6")
dest_group.add_argument('-f', '--destination_file', action="store",
help="File with destination hosts IPv6")
sids_group = parser.add_mutually_exclusive_group(required=True)
sids_group.add_argument('-l', '--list_of_sids', action="store",
help="File with SID LIST")
sids_group.add_argument('-c', '--count', action="store", default=3,
help="Count of random IPv6 SR hops", type=int)
parser.add_argument('-s', '--packetsize', action="store", default=8,
help="ICMP echo packet data size", required=False, type=int)
parser.add_argument('-t', '--timeout', action="store", default=2,
help="Scapy packet timeout", required=False, type=int)
parser.add_argument('-v', '--verbosity', action="store", default=0,
help="Scapy verbosity", required=False, type=int)
args = parser.parse_args()
if args.list_of_sids is not None:
sids_file = args.list_of_sids
if not os.path.isfile(sids_file):
print("SID LIST file does not exist")
sys.exit(1)
sr_hops = parse_sids(sids_file)
else:
sr_hops = srv6_tracert.generate_hops(args.count)
if args.destination_file is not None:
dest_file = args.destination_file
if not os.path.isfile(dest_file):
print("Destination host file does not exist")
sys.exit(1)
destinations = parse_config(dest_file)
multiple_traceroute(destinations, sr_hops, args.packetsize, args.verbosity, args.timeout)
else:
results = traceroute(args.destination, sr_hops, args.packetsize, args.verbosity,
args.timeout)
print_results(results)
if __name__ == "__main__":
main()