-
Notifications
You must be signed in to change notification settings - Fork 2
/
seq.go
183 lines (161 loc) · 3.75 KB
/
seq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package ping
import (
"context"
"net"
"sync"
"time"
)
// pending holds information about the sent request
type pending struct {
// ctx is context of the sender
ctx context.Context
// at is the timestamp when the request was sent or received
at time.Time
// earlyReply is a Reply without RTT set
// because TX timestamp has not been received yet
earlyReply *Reply
// reply is where to send the reply to
reply chan<- Reply
}
// sequences associates ICMP sequence numbers with pending requests
type sequences struct {
r reserve
m map[uint16]*pending
mu sync.RWMutex
}
func newSequences() *sequences {
return &sequences{
m: make(map[uint16]*pending),
r: newReserve(),
}
}
// close frees resources allocated for sequences
func (s *sequences) close() {
s.r.close()
}
// add returns available ICMP sequence number and a channel where send the Reply to
// unless given ctx is done.
func (s *sequences) add(ctx context.Context) (uint16, <-chan Reply, error) {
seq, err := s.r.get(ctx)
if err != nil {
return 0, nil, err
}
rep := make(chan Reply, 1)
s.mu.Lock()
s.m[seq] = &pending{
ctx: ctx,
at: time.Now(),
reply: rep,
}
s.mu.Unlock()
return seq, rep, nil
}
// sentAt updates the transmit timestamp for given ICMP sequence number.
// It should not be concurrently used with sequences.reply().
func (s *sequences) sentAt(seq uint16, sentAt time.Time) {
pend := s.get(seq)
if pend == nil {
return
}
if pend.earlyReply == nil {
pend.at = sentAt
} else {
pend.earlyReply.RTT = nonNegative(pend.at.Sub(sentAt))
select {
case <-pend.ctx.Done():
// sender gave up waiting for the reply
return
case pend.reply <- *pend.earlyReply:
}
}
}
// reply dispatches the reply for given sequence number to the sender.
// It should not be concurrentlyy used with sequences.sentAt()
func (s *sequences) reply(from net.IP, seq uint16, receivedAt time.Time,
payload []byte, ttl uint8, icmpErr ICMPError) {
pend := s.get(seq)
if pend == nil {
return
}
if pend.at.IsZero() {
// TX timestamp has not been received yet
pend.at = receivedAt
pend.earlyReply = &Reply{
From: from,
TTL: ttl,
Data: payload,
Err: icmpErr,
}
return
}
select {
case <-pend.ctx.Done():
// sender gave up waiting for the reply
return
case pend.reply <- Reply{
From: from,
RTT: nonNegative(receivedAt.Sub(pend.at)),
TTL: ttl,
Data: payload,
Err: icmpErr,
}:
}
}
// get returns pending associates with given ICMP sequence number if any.
// Otherwise, it returns nil.
func (s *sequences) get(seq uint16) *pending {
s.mu.RLock()
p := s.m[seq]
s.mu.RUnlock()
return p
}
// free frees resources associated with given ICMP sequence number.
func (s *sequences) free(seq uint16) *pending {
s.mu.Lock()
p, found := s.m[seq]
delete(s.m, seq)
s.mu.Unlock()
if !found {
return nil
}
s.r.free(seq)
return p
}
func nonNegative(d time.Duration) time.Duration {
if d < 0 {
return 0
}
return d
}
// reserve stores unique uint16 numbers
type reserve chan uint16
func newReserve() reserve {
ch := make(chan uint16, 1<<16)
for seq := uint16(0); ; seq++ {
ch <- seq
if seq == 1<<16-1 {
break
}
}
return ch
}
// close frees resources allocated for reserve
func (r reserve) close() {
close(r)
}
// get allocates unique uint16 unless ctx is done. The returned number should
// then be freed with reserve.free() to make it availbale for future usage.
// Once get() returned a number, no other calls get() will return this number
// until free() is called with the number.
func (r reserve) get(ctx context.Context) (uint16, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
case id := <-r:
return id, nil
}
}
// free pushes back given id, making it available to reserve.get() again.
func (r reserve) free(id uint16) {
r <- id
}