diff --git a/main.go b/main.go index 1e4ae0f8..32dfc431 100644 --- a/main.go +++ b/main.go @@ -68,7 +68,7 @@ func main() { RDns: *rdnsenable, IPGeoSource: ipgeo.GetSource(*dataOrigin), Timeout: 2 * time.Second, - + RoutePath: *routePath, //Quic: false, } diff --git a/reporter/reporter.go b/reporter/reporter.go index 1c894304..c9c13a58 100644 --- a/reporter/reporter.go +++ b/reporter/reporter.go @@ -42,14 +42,16 @@ func experimentTag() { func (r *reporter) generateRouteReportNode(ip string, ipGeoData ipgeo.IPGeoData) (routeReportNode, error) { rpn := routeReportNode{} - ptr, err := net.LookupAddr(ip) - if err == nil { - if strings.Contains(strings.ToLower(ptr[0]), "ix") { - rpn.ix = true - } else { - rpn.ix = false + go func() { + ptr, err := net.LookupAddr(ip) + if err == nil { + if strings.Contains(strings.ToLower(ptr[0]), "ix") { + rpn.ix = true + } else { + rpn.ix = false + } } - } + }() if strings.Contains(strings.ToLower(ipGeoData.Isp), "exchange") || strings.Contains(strings.ToLower(ipGeoData.Isp), "ix") || strings.Contains(strings.ToLower(ipGeoData.Owner), "exchange") || strings.Contains(strings.ToLower(ipGeoData.Owner), "ix") { rpn.ix = true @@ -60,7 +62,7 @@ func (r *reporter) generateRouteReportNode(ip string, ipGeoData ipgeo.IPGeoData) rpn.asn = ipGeoData.Asnumber } // 无论最后一跳是否为存在地理位置信息(AnyCast),都应该给予显示 - if ipGeoData.Country == "" || ipGeoData.City == "" && ip != r.targetIP { + if ipGeoData.Country == "" || ipGeoData.City == "" || ipGeoData.City == "-" && ip != r.targetIP { return rpn, errors.New("GeoData Search Failed") } else { if ipGeoData.City == "" { diff --git a/trace/icmp.go b/trace/icmp_ipv4.go similarity index 93% rename from trace/icmp.go rename to trace/icmp_ipv4.go index 2ec4285c..efa24e9b 100644 --- a/trace/icmp.go +++ b/trace/icmp_ipv4.go @@ -1,9 +1,11 @@ package trace import ( + "fmt" "log" "net" "os" + "strconv" "sync" "time" @@ -92,7 +94,7 @@ func (t *ICMPTracer) listenICMP() { case ipv4.ICMPTypeEchoReply: t.handleICMPMessage(msg, 1, rm.Body.(*icmp.Echo).Data) default: - log.Println("received icmp message of unknown type", rm.Type) + // log.Println("received icmp message of unknown type", rm.Type) } } } @@ -160,6 +162,10 @@ func (t *ICMPTracer) send(fork workFork) error { t.inflightRequestLock.Unlock() }() + if fork.num == 0 && t.Config.RoutePath { + fmt.Print(strconv.Itoa(fork.ttl)) + } + select { case <-t.ctx.Done(): return nil @@ -186,6 +192,9 @@ func (t *ICMPTracer) send(fork workFork) error { h.RTT = rtt h.fetchIPData(t.Config) + if t.Config.RoutePath { + HopPrinter(h) + } t.res.add(h) @@ -201,6 +210,9 @@ func (t *ICMPTracer) send(fork workFork) error { RTT: 0, Error: ErrHopLimitTimeout, }) + if t.Config.RoutePath { + fmt.Println("\t" + "*") + } } return nil diff --git a/trace/icmp_ipv6.go b/trace/icmp_ipv6.go new file mode 100644 index 00000000..93d78ec1 --- /dev/null +++ b/trace/icmp_ipv6.go @@ -0,0 +1,219 @@ +package trace + +import ( + "fmt" + "log" + "net" + "os" + "strconv" + "sync" + "time" + + "golang.org/x/net/context" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv6" + "golang.org/x/sync/semaphore" +) + +type ICMPTracerv6 struct { + Config + wg sync.WaitGroup + res Result + ctx context.Context + inflightRequest map[int]chan Hop + inflightRequestLock sync.Mutex + icmpListen net.PacketConn + workFork workFork + final int + finalLock sync.Mutex + + sem *semaphore.Weighted +} + +func (t *ICMPTracerv6) Execute() (*Result, error) { + if len(t.res.Hops) > 0 { + return &t.res, ErrTracerouteExecuted + } + + var err error + + t.icmpListen, err = net.ListenPacket("ip6:58", "::") + if err != nil { + return &t.res, err + } + defer t.icmpListen.Close() + + var cancel context.CancelFunc + t.ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + t.inflightRequest = make(map[int]chan Hop) + t.final = -1 + + go t.listenICMP() + + t.sem = semaphore.NewWeighted(int64(t.ParallelRequests)) + + for t.workFork.ttl = 1; t.workFork.ttl <= t.MaxHops; t.workFork.ttl++ { + for i := 0; i < t.NumMeasurements; i++ { + t.wg.Add(1) + go t.send(workFork{t.workFork.ttl, i}) + } + // 一组TTL全部退出(收到应答或者超时终止)以后,再进行下一个TTL的包发送 + t.wg.Wait() + t.workFork.num = 0 + } + t.res.reduce(t.final) + + return &t.res, nil +} + +func (t *ICMPTracerv6) listenICMP() { + lc := NewPacketListener(t.icmpListen, t.ctx) + go lc.Start() + for { + select { + case <-t.ctx.Done(): + return + case msg := <-lc.Messages: + if msg.N == nil { + continue + } + rm, err := icmp.ParseMessage(58, msg.Msg[:*msg.N]) + if err != nil { + log.Println(err) + continue + } + // log.Println(msg.Peer) + switch rm.Type { + case ipv6.ICMPTypeTimeExceeded: + t.handleICMPMessage(msg, 0, rm.Body.(*icmp.TimeExceeded).Data) + case ipv6.ICMPTypeEchoReply: + t.handleICMPMessage(msg, 1, rm.Body.(*icmp.Echo).Data) + default: + // log.Println("received icmp message of unknown type", rm.Type) + } + } + } + +} + +func (t *ICMPTracerv6) handleICMPMessage(msg ReceivedMessage, icmpType int8, data []byte) { + + t.inflightRequestLock.Lock() + defer t.inflightRequestLock.Unlock() + ch, ok := t.inflightRequest[t.workFork.num] + t.workFork.num += 1 + if !ok { + return + } + ch <- Hop{ + Success: true, + Address: msg.Peer, + } +} + +func (t *ICMPTracerv6) send(fork workFork) error { + err := t.sem.Acquire(context.Background(), 1) + if err != nil { + return err + } + + defer t.sem.Release(1) + + defer t.wg.Done() + if t.final != -1 && fork.ttl > t.final { + return nil + } + + icmpHeader := icmp.Message{ + Type: ipv6.ICMPTypeEchoRequest, Code: 0, + Body: &icmp.Echo{ + ID: os.Getpid() & 0xffff, + Data: []byte("HELLO-R-U-THERE"), + }, + } + + p := ipv6.NewPacketConn(t.icmpListen) + + icmpHeader.Body.(*icmp.Echo).Seq = fork.ttl + p.SetHopLimit(fork.ttl) + + wb, err := icmpHeader.Marshal(nil) + if err != nil { + log.Fatal(err) + } + + start := time.Now() + if _, err := t.icmpListen.WriteTo(wb, &net.IPAddr{IP: t.DestIP}); err != nil { + log.Fatal(err) + } + if err := t.icmpListen.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil { + log.Fatal(err) + } + t.inflightRequestLock.Lock() + hopCh := make(chan Hop) + t.inflightRequest[fork.num] = hopCh + t.inflightRequestLock.Unlock() + + defer func() { + t.inflightRequestLock.Lock() + close(hopCh) + delete(t.inflightRequest, fork.ttl) + t.inflightRequestLock.Unlock() + }() + + if fork.num == 0 && t.Config.RoutePath { + fmt.Print(strconv.Itoa(fork.ttl)) + } + + select { + case <-t.ctx.Done(): + return nil + case h := <-hopCh: + rtt := time.Since(start) + if t.final != -1 && fork.ttl > t.final { + return nil + } + if addr, ok := h.Address.(*net.IPAddr); ok && addr.IP.Equal(t.DestIP) { + t.finalLock.Lock() + if t.final == -1 || fork.ttl < t.final { + t.final = fork.ttl + } + t.finalLock.Unlock() + } else if addr, ok := h.Address.(*net.TCPAddr); ok && addr.IP.Equal(t.DestIP) { + t.finalLock.Lock() + if t.final == -1 || fork.ttl < t.final { + t.final = fork.ttl + } + t.finalLock.Unlock() + } + + h.TTL = fork.ttl + h.RTT = rtt + + h.fetchIPData(t.Config) + if t.Config.RoutePath { + HopPrinter(h) + } + + t.res.add(h) + + case <-time.After(t.Timeout): + if t.final != -1 && fork.ttl > t.final { + return nil + } + + t.res.add(Hop{ + Success: false, + Address: nil, + TTL: fork.ttl, + RTT: 0, + Error: ErrHopLimitTimeout, + }) + if t.Config.RoutePath { + fmt.Println("\t" + "*") + } + } + + return nil +} diff --git a/trace/tcp.go b/trace/tcp_ipv4.go similarity index 100% rename from trace/tcp.go rename to trace/tcp_ipv4.go diff --git a/trace/tcp_ipv6.go b/trace/tcp_ipv6.go new file mode 100644 index 00000000..3be1fd5f --- /dev/null +++ b/trace/tcp_ipv6.go @@ -0,0 +1,269 @@ +package trace + +import ( + "log" + "math" + "math/rand" + "net" + "sync" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/xgadget-lab/nexttrace/listener_channel" + "github.com/xgadget-lab/nexttrace/util" + "golang.org/x/net/context" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv6" + "golang.org/x/sync/semaphore" +) + +type TCPTracerv6 struct { + Config + wg sync.WaitGroup + res Result + ctx context.Context + inflightRequest map[int]chan Hop + inflightRequestLock sync.Mutex + SrcIP net.IP + icmp net.PacketConn + tcp net.PacketConn + + final int + finalLock sync.Mutex + + sem *semaphore.Weighted +} + +func (t *TCPTracerv6) Execute() (*Result, error) { + if len(t.res.Hops) > 0 { + return &t.res, ErrTracerouteExecuted + } + + t.SrcIP, _ = util.LocalIPPort(t.DestIP) + + var err error + t.tcp, err = net.ListenPacket("ip6:tcp", t.SrcIP.String()) + if err != nil { + return nil, err + } + t.icmp, err = icmp.ListenPacket("ip6:53", "::") + if err != nil { + return &t.res, err + } + defer t.icmp.Close() + + var cancel context.CancelFunc + t.ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + t.inflightRequest = make(map[int]chan Hop) + t.final = -1 + + go t.listenICMP() + go t.listenTCP() + + t.sem = semaphore.NewWeighted(int64(t.ParallelRequests)) + + for ttl := 1; ttl <= t.MaxHops; ttl++ { + for i := 0; i < t.NumMeasurements; i++ { + t.wg.Add(1) + go t.send(ttl) + + } + time.Sleep(1 * time.Millisecond) + } + t.wg.Wait() + t.res.reduce(t.final) + + return &t.res, nil +} + +func (t *TCPTracerv6) listenICMP() { + lc := NewPacketListener(t.icmp, t.ctx) + go lc.Start() + for { + select { + case <-t.ctx.Done(): + return + case msg := <-lc.Messages: + if msg.N == nil { + continue + } + rm, err := icmp.ParseMessage(53, msg.Msg[:*msg.N]) + if err != nil { + log.Println(err) + continue + } + log.Println(msg.Peer) + switch rm.Type { + case ipv6.ICMPTypeTimeExceeded: + t.handleICMPMessage(msg, rm.Body.(*icmp.TimeExceeded).Data) + case ipv6.ICMPTypeDestinationUnreachable: + t.handleICMPMessage(msg, rm.Body.(*icmp.DstUnreach).Data) + default: + //log.Println("received icmp message of unknown type", rm.Type) + } + } + } + +} + +// @title listenTCP +// @description 监听TCP的响应数据包 +func (t *TCPTracerv6) listenTCP() { + lc := listener_channel.New(t.tcp) + + defer lc.Stop() + + go lc.Start() + + for { + select { + case <-t.ctx.Done(): + return + case msg := <-lc.Messages: + if msg.N == nil { + continue + } + if msg.Peer.String() != t.DestIP.String() { + continue + } + + // 解包 + packet := gopacket.NewPacket(msg.Msg[:*msg.N], layers.LayerTypeTCP, gopacket.Default) + // 从包中获取TCP layer信息 + if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil { + tcp, _ := tcpLayer.(*layers.TCP) + // 取得目标主机的Sequence Number + + if ch, ok := t.inflightRequest[int(tcp.Ack-1)]; ok { + // 最后一跳 + ch <- Hop{ + Success: true, + Address: msg.Peer, + } + } + } + } + } +} + +func (t *TCPTracerv6) handleICMPMessage(msg ReceivedMessage, data []byte) { + header, err := util.GetICMPResponsePayload(data) + if err != nil { + return + } + sequenceNumber := util.GetTCPSeq(header) + t.inflightRequestLock.Lock() + defer t.inflightRequestLock.Unlock() + ch, ok := t.inflightRequest[int(sequenceNumber)] + if !ok { + return + } + ch <- Hop{ + Success: true, + Address: msg.Peer, + } + +} + +func (t *TCPTracerv6) send(ttl int) error { + err := t.sem.Acquire(context.Background(), 1) + if err != nil { + return err + } + defer t.sem.Release(1) + + defer t.wg.Done() + if t.final != -1 && ttl > t.final { + return nil + } + // 随机种子 + r := rand.New(rand.NewSource(time.Now().UnixNano())) + _, srcPort := util.LocalIPPort(t.DestIP) + ipHeader := &layers.IPv6{ + SrcIP: t.SrcIP, + DstIP: t.DestIP, + NextHeader: layers.IPProtocolTCP, + HopLimit: uint8(ttl), + } + // 使用Uint16兼容32位系统,防止在rand的时候因使用int32而溢出 + sequenceNumber := uint32(r.Intn(math.MaxUint16)) + tcpHeader := &layers.TCP{ + SrcPort: layers.TCPPort(srcPort), + DstPort: layers.TCPPort(t.DestPort), + Seq: sequenceNumber, + SYN: true, + Window: 14600, + } + _ = tcpHeader.SetNetworkLayerForChecksum(ipHeader) + + buf := gopacket.NewSerializeBuffer() + opts := gopacket.SerializeOptions{ + ComputeChecksums: true, + FixLengths: true, + } + if err := gopacket.SerializeLayers(buf, opts, tcpHeader); err != nil { + return err + } + + ipv6.NewPacketConn(t.tcp) + if err != nil { + return err + } + + start := time.Now() + if _, err := t.tcp.WriteTo(buf.Bytes(), &net.IPAddr{IP: t.DestIP}); err != nil { + return err + } + t.inflightRequestLock.Lock() + hopCh := make(chan Hop) + t.inflightRequest[int(sequenceNumber)] = hopCh + t.inflightRequestLock.Unlock() + + select { + case <-t.ctx.Done(): + return nil + case h := <-hopCh: + rtt := time.Since(start) + if t.final != -1 && ttl > t.final { + return nil + } + + if addr, ok := h.Address.(*net.IPAddr); ok && addr.IP.Equal(t.DestIP) { + t.finalLock.Lock() + if t.final == -1 || ttl < t.final { + t.final = ttl + } + t.finalLock.Unlock() + } else if addr, ok := h.Address.(*net.TCPAddr); ok && addr.IP.Equal(t.DestIP) { + t.finalLock.Lock() + if t.final == -1 || ttl < t.final { + t.final = ttl + } + t.finalLock.Unlock() + } + + h.TTL = ttl + h.RTT = rtt + + h.fetchIPData(t.Config) + + t.res.add(h) + + case <-time.After(t.Timeout): + if t.final != -1 && ttl > t.final { + return nil + } + + t.res.add(Hop{ + Success: false, + Address: nil, + TTL: ttl, + RTT: 0, + Error: ErrHopLimitTimeout, + }) + } + + return nil +} diff --git a/trace/temp_printer.go b/trace/temp_printer.go new file mode 100644 index 00000000..d067adf2 --- /dev/null +++ b/trace/temp_printer.go @@ -0,0 +1,76 @@ +package trace + +import ( + "fmt" + "strings" + + "github.com/xgadget-lab/nexttrace/ipgeo" +) + +func HopPrinter(h Hop) { + if h.Address == nil { + fmt.Println("\t*") + } else { + txt := "\t" + + if h.Hostname == "" { + txt += fmt.Sprint(h.Address, " ", fmt.Sprintf("%.2f", h.RTT.Seconds()*1000), "ms") + } else { + txt += fmt.Sprint(h.Hostname, " (", h.Address, ") ", fmt.Sprintf("%.2f", h.RTT.Seconds()*1000), "ms") + } + + if h.Geo != nil { + txt += " " + formatIpGeoData(h.Address.String(), h.Geo) + } + + fmt.Println(txt) + } +} + +func formatIpGeoData(ip string, data *ipgeo.IPGeoData) string { + var res = make([]string, 0, 10) + + if data.Asnumber == "" { + res = append(res, "*") + } else { + res = append(res, "AS"+data.Asnumber) + } + + // TODO: 判断阿里云和腾讯云内网,数据不足,有待进一步完善 + // TODO: 移动IDC判断到Hop.fetchIPData函数,减少API调用 + if strings.HasPrefix(ip, "9.") { + res = append(res, "局域网", "腾讯云") + } else if strings.HasPrefix(ip, "11.") { + res = append(res, "局域网", "阿里云") + } else if data.Country == "" { + res = append(res, "局域网") + } else { + // 有些IP的归属信息为空,这个时候将ISP的信息填入 + if data.Owner == "" { + data.Owner = data.Isp + } + if data.District != "" { + data.City = data.City + ", " + data.District + } + if data.Prov == "" && data.City == "" { + // anyCast或是骨干网数据不应该有国家信息 + data.Owner = data.Owner + ", " + data.Owner + } else { + // 非骨干网正常填入IP的国家信息数据 + res = append(res, data.Country) + } + + if data.Prov != "" { + res = append(res, data.Prov) + } + if data.City != "" { + res = append(res, data.City) + } + + if data.Owner != "" { + res = append(res, data.Owner) + } + } + + return strings.Join(res, ", ") +} diff --git a/trace/trace.go b/trace/trace.go index 09742c81..4649b5df 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -25,6 +25,7 @@ type Config struct { Quic bool IPGeoSource ipgeo.Source RDns bool + RoutePath bool } type Method string @@ -54,11 +55,24 @@ func Traceroute(method Method, config Config) (*Result, error) { switch method { case ICMPTrace: - tracer = &ICMPTracer{Config: config} + if config.DestIP.To4() != nil { + tracer = &ICMPTracer{Config: config} + } else { + tracer = &ICMPTracerv6{Config: config} + } + case UDPTrace: - tracer = &UDPTracer{Config: config} + if config.DestIP.To4() != nil { + tracer = &UDPTracer{Config: config} + } else { + return nil, errors.New("IPv6 UDP Traceroute is not supported") + } case TCPTrace: - tracer = &TCPTracer{Config: config} + if config.DestIP.To4() != nil { + tracer = &TCPTracer{Config: config} + } else { + return nil, errors.New("IPv6 TCP Traceroute is not supported") + } default: return &Result{}, ErrInvalidMethod } diff --git a/trace/udp.go b/trace/udp.go index f7ff0ce8..98005f89 100644 --- a/trace/udp.go +++ b/trace/udp.go @@ -1,6 +1,11 @@ package trace import ( + "log" + "net" + "sync" + "time" + "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/xgadget-lab/nexttrace/util" @@ -8,10 +13,6 @@ import ( "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/sync/semaphore" - "log" - "net" - "sync" - "time" ) type UDPTracer struct { @@ -99,8 +100,8 @@ func (t *UDPTracer) handleICMPMessage(msg ReceivedMessage, data []byte) { return } srcPort := util.GetUDPSrcPort(header) - t.inflightRequestLock.Lock() - defer t.inflightRequestLock.Unlock() + //t.inflightRequestLock.Lock() + //defer t.inflightRequestLock.Unlock() ch, ok := t.inflightRequest[int(srcPort)] if !ok { return @@ -128,7 +129,6 @@ func (t *UDPTracer) getUDPConn(try int) (net.IP, int, net.PacketConn) { } return t.getUDPConn(try + 1) } - return srcIP, udpConn.LocalAddr().(*net.UDPAddr).Port, udpConn } @@ -184,6 +184,7 @@ func (t *UDPTracer) send(ttl int) error { return err } + // 在对inflightRequest进行写操作的时候应该加锁保护,以免多个goroutine协程试图同时写入造成panic t.inflightRequestLock.Lock() hopCh := make(chan Hop) t.inflightRequest[srcPort] = hopCh diff --git a/util/util.go b/util/util.go index 3c7dc6e6..e1f2bcf7 100644 --- a/util/util.go +++ b/util/util.go @@ -36,12 +36,13 @@ func DomainLookUp(host string) net.IP { var ipv6Flag = false for _, ip := range ips { + ipSlice = append(ipSlice, ip) // 仅返回ipv4的ip - if ip.To4() != nil { - ipSlice = append(ipSlice, ip) - } else { - ipv6Flag = true - } + // if ip.To4() != nil { + // ipSlice = append(ipSlice, ip) + // } else { + // ipv6Flag = true + // } } if ipv6Flag {