forked from OpenDDS/OpenDDS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWAIT_FOR_ACKS
170 lines (144 loc) · 7.35 KB
/
WAIT_FOR_ACKS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
DataWriter::wait_for_acknowledgments()
This method is defined in formal/07-01-01 Section: 7.1.2.4.1.12
Issue 8551: http://www.omg.org/issues/issue8551.txt
Mar 11, 2005 - received
Aug 1, 2005 - closed
Adds DataWriter::wait_for_acknowlegments(). delete_datawriter() is to
delete the publication immediately. Use of this new method can delay the
applications call to delete_datawriter().
Issue 8557: http://www.omg.org/issues/issue8557.txt
Mar 11, 2005 - received
Aug 1, 2005 - closed
Resolution is same as 8557
========================================================================
=== DERIVED CONSTRAINTS:
*) Use the DataSampleHeader::sequence_ value to track samples in transit;
*) Only send acks from subscriptions when requested (minimizes unnecessary
traffic);
*) This is only implemented for RELIABLE transport policy values;
BEST_EFFORT transports will drop samples and return immediately (from
spec.).
========================================================================
=== USE CASE: synchronize publication deletion to avoid sample loss
=== SCENARIO: application calls wait_for_acknowledgements()
1) Application calls DataWriter::wait_for_acknowledgments()
A) Control sample sent to all subscriptions associated with this
publication, DataSampleHeader with MessageId==REQUEST_ACK NOTE(1);
content includes last DataSampleHeader::sequence_ value sent by
this publication NOTE(2,3).
B) Thread blocks waiting on condition NOTE(4).
C) When the condition is signaled, check if each associated
subscription has sent an SAMPLE_ACK message and that the message
includes the last published sample sequence. If not, then wait on
the condition again NOTE(5).
2) Subscription receives REQUEST_ACK message.
A) ACK message read to determine end sample DataSampleHeader::sequence_
value.
B) If that sample has not been received, establish a time at which to
abandon waiting for that sample NOTE(6).
C) As each SAMPLE_DATA message is received, check its sequence_ value
to the requested end sample value.
D) If the end sample sequence_ value or greater is observed then send
a SAMPLE_ACK message with the last sample sequence_ value observed
NOTE(7).
NOTES:
1) Add MessageId enumeration values: REQUEST_ACK, SAMPLE_ACK
2) This implies that each publication is writing with an incrementing
sequence_ value at all times. Currently this is done on a per
instance basis. We need to hage this to be a single sequence value
for the entire DataWriter.
3) The control message contents will be serialized / deserialized
directly and not rely on any IDL defintion for the content.
4) Each DataWriter will have a dedicated condition that can be signaled
on reception of SAMPLE_ACK messages so that the blocking condition
can be rechecked.
5) A bounding timeout is part of the call to this method as well.
6) The max_wait value from the publication side call will be used to
bound this interval. It is guaranteed that if the subscription
retains this as a deadline, it will never fail to respond due
to deadline passed prior to the deadline being reached on the
publication side.
7) This also will be serialized / deserialized without recourse to an
IDL data definition.
========================================================================
=== CODE MODIFICATIONS:
DdsDcpsPublication.idl:
ADD METHOD: DataWriter::wait_for_acknowledgments( in Duration_t max_wait);
DataSampleHeader:
ADD MEMBER: Messageid::REQUEST_ACK and MessageId::SAMPLE_ACK
TransportSendListener:
ADD METHOD: deliver_ack()
- new method for handling SAMPLE_ACK messages.
DataWriterImpl:
ADD MEMBER: wfaLock_
ADD MEMBER: wfaCondition_
ADD METHOD: wait_for_acknowledgments()
- implement specified behavior. This includes forming
and sending a REQUEST_ACK message with the most
recent sequence_ value and then blocking on the
wfaCondition_. When unblocked, check if all
associated subscriptions have acknowledged the last
sample; if not, then continue to block on the
signal. Check that the timeout has not been
exceeded as well.
ADD METHOD: deliver_ack()
- handle reception of SAMPLE_ACK messages. This will
consist of tracking acks from each associated
subscription and signalling the wfaCondition_.
MODIFY METHOD: write()
- Add and increment DataSampleHeader::sequence_
when sending.
SimpleTcpReceiveStrategy:
ReliableMulticastTransportReceiveStrategy:
MODIFY METHOD: deliver_sample()
- add case for SAMPLE_ACK message Id to dispatch
to new framework deliver_ack() method.
DataLink:
ADD METHOD: ack_received()
- route the SAMPLE_ACK messages to the appropriate
DataWriter for disposition. This uses the serialized
publication Id value of the control message sample
to determine which publication should receive
the message.
MODIFY METHOD: make_reservation()
- add TransportSendListener parameter.
TransportImpl:
MODIFY METHOD: make_reservation()
- add TransportSendListener parameter.
TransportInterface:
MODIFY METHOD: add_subscriptions()
MODIFY METHOD: add_publications()
MODIFY METHOD: add_associations()
- add TransportSendListener parameter.
DataReaderImpl::WriterInfo:
ADD MEMBER: ack_timer_id_; Initialize to -1;
ADD MEMBER: ack_sequence_end_; Initialize to -1;
ADD MEMBER: latest_sequence_; Initialize to -1;
WriterInfo:
ADD MEMBER: ack_deadlines_
- list of deadlines for each requested sequence number.
ADD METHOD: ack_deadline()
- add a new ack_deadline. If this is a request
for a sequence number that already has a deadline,
only update the value if the new deadline is *later*
than the old one.
ADD METHOD: should_ack()
- determine if a SAMPLE_ACK message should be sent.
DataReaderImpl:
MODIFY METHOD: data_received()
- add a case for REQUEST_ACK. This should determine
the deadline after which we no longer need to send
a SAMPLE_ACK response message and check the highest
sequence_ value received against the requests value.
If the sequence_end_ value has already been received,
send a SAMPLE_ACK message. Store the sequence and
its deadline associated with the publication to
which the response should be sent. Determine and
store the deadline after which we should not send
a SAMPLE_ACK message.
- determine if SAMPLE_ACK messages should be sent as
new samples are received and their sequence numbers
evaluated.
ADD METHOD: send_sample_ack()
- internal utility method to actually form and send
the SAMPLE_ACK messages that need to be sent.