Skip to content

Commit

Permalink
refactor: icmp
Browse files Browse the repository at this point in the history
  • Loading branch information
zhshch2002 committed May 23, 2022
1 parent 8b2d2f3 commit e728b6b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 119 deletions.
89 changes: 27 additions & 62 deletions trace/icmp_ipv4.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,17 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/sync/semaphore"
)

type ICMPTracer struct {
Config
wg sync.WaitGroup
res Result
ctx context.Context
inflightRequest map[int]chan Hop
inflightRequestLock sync.Mutex
icmpListen net.PacketConn
workFork workFork
final int
finalLock sync.Mutex

sem *semaphore.Weighted
}

type workFork struct {
ttl int
num int
wg sync.WaitGroup
res Result
ctx context.Context
resCh chan Hop
icmpListen net.PacketConn
final int
finalLock sync.Mutex
}

func (t *ICMPTracer) Execute() (*Result, error) {
Expand All @@ -49,24 +39,24 @@ func (t *ICMPTracer) Execute() (*Result, error) {
var cancel context.CancelFunc
t.ctx, cancel = context.WithCancel(context.Background())
defer cancel()
t.inflightRequest = make(map[int]chan Hop)
t.resCh = make(chan Hop)
t.final = -1

go t.listenICMP()

t.sem = semaphore.NewWeighted(int64(t.ParallelRequests))

for t.workFork.ttl = 1; t.workFork.ttl <= t.MaxHops; t.workFork.ttl++ {
for ttl := 1; ttl <= t.MaxHops; ttl++ {
if t.final != -1 && ttl > t.final {
break
}
for i := 0; i < t.NumMeasurements; i++ {
t.wg.Add(1)
go t.send(workFork{t.workFork.ttl, i})
go t.send(ttl)
}
// 一组TTL全部退出(收到应答或者超时终止)以后,再进行下一个TTL的包发送
t.wg.Wait()
if t.RealtimePrinter != nil {
t.RealtimePrinter(&t.res, t.workFork.ttl-1)
t.RealtimePrinter(&t.res, ttl-1)
}
t.workFork.num = 0
}
t.res.reduce(t.final)

Expand Down Expand Up @@ -103,29 +93,15 @@ func (t *ICMPTracer) listenICMP() {
}

func (t *ICMPTracer) handleICMPMessage(msg ReceivedMessage, icmpType int8, data []byte) {

t.inflightRequestLock.Lock()
defer t.inflightRequestLock.Unlock()
ch, ok := t.inflightRequest[t.workFork.num]
t.workFork.num += 1
if !ok {
return
}
ch <- Hop{
t.resCh <- Hop{
Success: true,
Address: msg.Peer,
}
}

func (t *ICMPTracer) send(fork workFork) error {
err := t.sem.Acquire(context.Background(), 1)
if err != nil {
return err
}
defer t.sem.Release(1)

func (t *ICMPTracer) send(ttl int) error {
defer t.wg.Done()
if t.final != -1 && fork.ttl > t.final {
if t.final != -1 && ttl > t.final {
return nil
}

Expand All @@ -137,7 +113,7 @@ func (t *ICMPTracer) send(fork workFork) error {
},
}

ipv4.NewPacketConn(t.icmpListen).SetTTL(fork.ttl)
ipv4.NewPacketConn(t.icmpListen).SetTTL(ttl)

wb, err := icmpHeader.Marshal(nil)
if err != nil {
Expand All @@ -151,56 +127,45 @@ func (t *ICMPTracer) send(fork workFork) error {
if err := t.icmpListen.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
log.Fatal(err)
}
t.inflightRequestLock.Lock()
hopCh := make(chan Hop)
t.inflightRequest[fork.num] = hopCh
t.inflightRequestLock.Unlock()

// defer func() {
// t.inflightRequestLock.Lock()
// close(hopCh)
// delete(t.inflightRequest, fork.ttl)
// t.inflightRequestLock.Unlock()
// }()

select {
case <-t.ctx.Done():
return nil
case h := <-hopCh:
case h := <-t.resCh:
rtt := time.Since(start)
if t.final != -1 && fork.ttl > t.final {
if t.final != -1 && ttl > t.final {
return nil
}
if addr, ok := h.Address.(*net.IPAddr); ok && addr.IP.Equal(t.DestIP) {
t.finalLock.Lock()
if t.final == -1 || fork.ttl < t.final {
t.final = fork.ttl
if t.final == -1 || ttl < t.final {
t.final = ttl
}
t.finalLock.Unlock()
} else if addr, ok := h.Address.(*net.TCPAddr); ok && addr.IP.Equal(t.DestIP) {
t.finalLock.Lock()
if t.final == -1 || fork.ttl < t.final {
t.final = fork.ttl
if t.final == -1 || ttl < t.final {
t.final = ttl
}
t.finalLock.Unlock()
}

h.TTL = fork.ttl
h.TTL = ttl
h.RTT = rtt

h.fetchIPData(t.Config)

t.res.add(h)

case <-time.After(t.Timeout):
if t.final != -1 && fork.ttl > t.final {
if t.final != -1 && ttl > t.final {
return nil
}

t.res.add(Hop{
Success: false,
Address: nil,
TTL: fork.ttl,
TTL: ttl,
RTT: 0,
Error: ErrHopLimitTimeout,
})
Expand Down
87 changes: 30 additions & 57 deletions trace/icmp_ipv6.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,17 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv6"
"golang.org/x/sync/semaphore"
)

type ICMPTracerv6 struct {
Config
wg sync.WaitGroup
res Result
ctx context.Context
inflightRequest map[int]chan Hop
inflightRequestLock sync.Mutex
icmpListen net.PacketConn
workFork workFork
final int
finalLock sync.Mutex

sem *semaphore.Weighted
wg sync.WaitGroup
res Result
ctx context.Context
resCh chan Hop
icmpListen net.PacketConn
final int
finalLock sync.Mutex
}

func (t *ICMPTracerv6) Execute() (*Result, error) {
Expand All @@ -44,21 +39,24 @@ func (t *ICMPTracerv6) Execute() (*Result, error) {
var cancel context.CancelFunc
t.ctx, cancel = context.WithCancel(context.Background())
defer cancel()
t.inflightRequest = make(map[int]chan Hop)
t.resCh = make(chan Hop)
t.final = -1

go t.listenICMP()

t.sem = semaphore.NewWeighted(int64(t.ParallelRequests))

for t.workFork.ttl = 1; t.workFork.ttl <= t.MaxHops; t.workFork.ttl++ {
for ttl := 1; ttl <= t.MaxHops; ttl++ {
if t.final != -1 && ttl > t.final {
break
}
for i := 0; i < t.NumMeasurements; i++ {
t.wg.Add(1)
go t.send(workFork{t.workFork.ttl, i})
go t.send(ttl)
}
// 一组TTL全部退出(收到应答或者超时终止)以后,再进行下一个TTL的包发送
t.wg.Wait()
t.workFork.num = 0
if t.RealtimePrinter != nil {
t.RealtimePrinter(&t.res, ttl-1)
}
}
t.res.reduce(t.final)

Expand Down Expand Up @@ -96,29 +94,15 @@ func (t *ICMPTracerv6) listenICMP() {
}

func (t *ICMPTracerv6) handleICMPMessage(msg ReceivedMessage, icmpType int8, data []byte) {
t.inflightRequestLock.Lock()
defer t.inflightRequestLock.Unlock()
ch, ok := t.inflightRequest[t.workFork.num]
t.workFork.num += 1
if !ok {
return
}
ch <- Hop{
t.resCh <- Hop{
Success: true,
Address: msg.Peer,
}
}

func (t *ICMPTracerv6) send(fork workFork) error {
err := t.sem.Acquire(context.Background(), 1)
if err != nil {
return err
}

defer t.sem.Release(1)

func (t *ICMPTracerv6) send(ttl int) error {
defer t.wg.Done()
if t.final != -1 && fork.ttl > t.final {
if t.final != -1 && ttl > t.final {
return nil
}

Expand All @@ -132,8 +116,8 @@ func (t *ICMPTracerv6) send(fork workFork) error {

p := ipv6.NewPacketConn(t.icmpListen)

icmpHeader.Body.(*icmp.Echo).Seq = fork.ttl
p.SetHopLimit(fork.ttl)
icmpHeader.Body.(*icmp.Echo).Seq = ttl
p.SetHopLimit(ttl)

wb, err := icmpHeader.Marshal(nil)
if err != nil {
Expand All @@ -147,56 +131,45 @@ func (t *ICMPTracerv6) send(fork workFork) error {
if err := t.icmpListen.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
log.Fatal(err)
}
t.inflightRequestLock.Lock()
hopCh := make(chan Hop)
t.inflightRequest[fork.num] = hopCh
t.inflightRequestLock.Unlock()

// defer func() {
// t.inflightRequestLock.Lock()
// close(hopCh)
// delete(t.inflightRequest, fork.ttl)
// t.inflightRequestLock.Unlock()
// }()

select {
case <-t.ctx.Done():
return nil
case h := <-hopCh:
case h := <-t.resCh:
rtt := time.Since(start)
if t.final != -1 && fork.ttl > t.final {
if t.final != -1 && ttl > t.final {
return nil
}
if addr, ok := h.Address.(*net.IPAddr); ok && addr.IP.Equal(t.DestIP) {
t.finalLock.Lock()
if t.final == -1 || fork.ttl < t.final {
t.final = fork.ttl
if t.final == -1 || ttl < t.final {
t.final = ttl
}
t.finalLock.Unlock()
} else if addr, ok := h.Address.(*net.TCPAddr); ok && addr.IP.Equal(t.DestIP) {
t.finalLock.Lock()
if t.final == -1 || fork.ttl < t.final {
t.final = fork.ttl
if t.final == -1 || ttl < t.final {
t.final = ttl
}
t.finalLock.Unlock()
}

h.TTL = fork.ttl
h.TTL = ttl
h.RTT = rtt

h.fetchIPData(t.Config)

t.res.add(h)

case <-time.After(t.Timeout):
if t.final != -1 && fork.ttl > t.final {
if t.final != -1 && ttl > t.final {
return nil
}

t.res.add(Hop{
Success: false,
Address: nil,
TTL: fork.ttl,
TTL: ttl,
RTT: 0,
Error: ErrHopLimitTimeout,
})
Expand Down

0 comments on commit e728b6b

Please sign in to comment.