1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
|
/*
* Copyright (C) 2011 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "LibAAH_RTP"
//#define LOG_NDEBUG 0
#include <utils/Log.h>
#include "aah_rx_player.h"
namespace android {
AAH_RXPlayer::RXRingBuffer::RXRingBuffer(uint32_t capacity) {
capacity_ = capacity;
rd_ = wr_ = 0;
ring_ = new PacketBuffer*[capacity];
memset(ring_, 0, sizeof(PacketBuffer*) * capacity);
reset();
}
AAH_RXPlayer::RXRingBuffer::~RXRingBuffer() {
reset();
delete[] ring_;
}
void AAH_RXPlayer::RXRingBuffer::reset() {
AutoMutex lock(&lock_);
if (NULL != ring_) {
while (rd_ != wr_) {
CHECK(rd_ < capacity_);
if (NULL != ring_[rd_]) {
PacketBuffer::destroy(ring_[rd_]);
ring_[rd_] = NULL;
}
rd_ = (rd_ + 1) % capacity_;
}
}
rd_ = wr_ = 0;
rd_seq_known_ = false;
waiting_for_fast_start_ = true;
fetched_first_packet_ = false;
rtp_activity_timeout_valid_ = false;
}
bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf,
uint16_t seq) {
AutoMutex lock(&lock_);
CHECK(NULL != ring_);
CHECK(NULL != buf);
rtp_activity_timeout_valid_ = true;
rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec;
// If the ring buffer is totally reset (we have never received a single
// payload) then we don't know the rd sequence number and this should be
// simple. We just store the payload, advance the wr pointer and record the
// initial sequence number.
if (!rd_seq_known_) {
CHECK(rd_ == wr_);
CHECK(NULL == ring_[wr_]);
CHECK(wr_ < capacity_);
ring_[wr_] = buf;
wr_ = (wr_ + 1) % capacity_;
rd_seq_ = seq;
rd_seq_known_ = true;
return true;
}
// Compute the seqence number of this payload and of the write pointer,
// normalized around the read pointer. IOW - transform the payload seq no
// and the wr pointer seq no into a space where the rd pointer seq no is
// zero. This will define 4 cases we can consider...
//
// 1) norm_seq == norm_wr_seq
// This payload is contiguous with the last. All is good.
//
// 2) ((norm_seq < norm_wr_seq) && (norm_seq >= norm_rd_seq)
// aka ((norm_seq < norm_wr_seq) && (norm_seq >= 0)
// This payload is in the past, in the unprocessed region of the ring
// buffer. It is probably a retransmit intended to fill in a dropped
// payload; it may be a duplicate.
//
// 3) ((norm_seq - norm_wr_seq) & 0x8000) != 0
// This payload is in the past compared to the write pointer (or so very
// far in the future that it has wrapped the seq no space), but not in
// the unprocessed region of the ring buffer. This could be a duplicate
// retransmit; we just drop these payloads unless we are waiting for our
// first fast start packet. If we are waiting for fast start, than this
// packet is probably the first packet of the fast start retransmission.
// If it will fit in the buffer, back up the read pointer to its position
// and clear the fast start flag, otherwise just drop it.
//
// 4) ((norm_seq - norm_wr_seq) & 0x8000) == 0
// This payload which is ahead of the next write pointer. This indicates
// that we have missed some payloads and need to request a retransmit.
// If norm_seq >= (capacity - 1), then the gap is so large that it would
// overflow the ring buffer and we should probably start to panic.
uint16_t norm_wr_seq = ((wr_ + capacity_ - rd_) % capacity_);
uint16_t norm_seq = seq - rd_seq_;
// Check for overflow first.
if ((!(norm_seq & 0x8000)) && (norm_seq >= (capacity_ - 1))) {
ALOGW("Ring buffer overflow; cap = %u, [rd, wr] = [%hu, %hu],"
" seq = %hu", capacity_, rd_seq_, norm_wr_seq + rd_seq_, seq);
PacketBuffer::destroy(buf);
return false;
}
// Check for case #1
if (norm_seq == norm_wr_seq) {
CHECK(wr_ < capacity_);
CHECK(NULL == ring_[wr_]);
ring_[wr_] = buf;
wr_ = (wr_ + 1) % capacity_;
CHECK(wr_ != rd_);
return true;
}
// Check case #2
uint32_t ring_pos = (rd_ + norm_seq) % capacity_;
if ((norm_seq < norm_wr_seq) && (!(norm_seq & 0x8000))) {
// Do we already have a payload for this slot? If so, then this looks
// like a duplicate retransmit. Just ignore it.
if (NULL != ring_[ring_pos]) {
ALOGD("RXed duplicate retransmit, seq = %hu", seq);
PacketBuffer::destroy(buf);
} else {
// Looks like we were missing this payload. Go ahead and store it.
ring_[ring_pos] = buf;
}
return true;
}
// Check case #3
if ((norm_seq - norm_wr_seq) & 0x8000) {
if (!waiting_for_fast_start_) {
ALOGD("RXed duplicate retransmit from before rd pointer, seq = %hu",
seq);
PacketBuffer::destroy(buf);
} else {
// Looks like a fast start fill-in. Go ahead and store it, assuming
// that we can fit it in the buffer.
uint32_t implied_ring_size = static_cast<uint32_t>(norm_wr_seq)
+ (rd_seq_ - seq);
if (implied_ring_size >= (capacity_ - 1)) {
ALOGD("RXed what looks like a fast start packet (seq = %hu),"
" but packet is too far in the past to fit into the ring"
" buffer. Dropping.", seq);
PacketBuffer::destroy(buf);
} else {
ring_pos = (rd_ + capacity_ + seq - rd_seq_) % capacity_;
rd_seq_ = seq;
rd_ = ring_pos;
waiting_for_fast_start_ = false;
CHECK(ring_pos < capacity_);
CHECK(NULL == ring_[ring_pos]);
ring_[ring_pos] = buf;
}
}
return true;
}
// Must be in case #4 with no overflow. This packet fits in the current
// ring buffer, but is discontiuguous. Advance the write pointer leaving a
// gap behind.
uint32_t gap_len = (ring_pos + capacity_ - wr_) % capacity_;
ALOGD("Drop detected; %u packets, seq_range [%hu, %hu]",
gap_len,
rd_seq_ + norm_wr_seq,
rd_seq_ + norm_wr_seq + gap_len - 1);
CHECK(NULL == ring_[ring_pos]);
ring_[ring_pos] = buf;
wr_ = (ring_pos + 1) % capacity_;
CHECK(wr_ != rd_);
return true;
}
AAH_RXPlayer::PacketBuffer*
AAH_RXPlayer::RXRingBuffer::fetchBuffer(bool* is_discon) {
AutoMutex lock(&lock_);
CHECK(NULL != ring_);
CHECK(NULL != is_discon);
// If the read seqence number is not known, then this ring buffer has not
// received a packet since being reset and there cannot be any packets to
// return. If we are still waiting for the first fast start packet to show
// up, we don't want to let any buffer be consumed yet because we expect to
// see a packet before the initial read sequence number show up shortly.
if (!rd_seq_known_ || waiting_for_fast_start_) {
*is_discon = false;
return NULL;
}
PacketBuffer* ret = NULL;
*is_discon = !fetched_first_packet_;
while ((rd_ != wr_) && (NULL == ret)) {
CHECK(rd_ < capacity_);
// If we hit a gap, stall and do not advance the read pointer. Let the
// higher level code deal with requesting retries and/or deciding to
// skip the current gap.
ret = ring_[rd_];
if (NULL == ret) {
break;
}
ring_[rd_] = NULL;
rd_ = (rd_ + 1) % capacity_;
++rd_seq_;
}
if (NULL != ret) {
fetched_first_packet_ = true;
}
return ret;
}
AAH_RXPlayer::GapStatus
AAH_RXPlayer::RXRingBuffer::fetchCurrentGap(SeqNoGap* gap) {
AutoMutex lock(&lock_);
CHECK(NULL != ring_);
CHECK(NULL != gap);
// If the read seqence number is not known, then this ring buffer has not
// received a packet since being reset and there cannot be any gaps.
if (!rd_seq_known_) {
return kGS_NoGap;
}
// If we are waiting for fast start, then the current gap is a fast start
// gap and it includes all packets before the read sequence number.
if (waiting_for_fast_start_) {
gap->start_seq_ =
gap->end_seq_ = rd_seq_ - 1;
return kGS_FastStartGap;
}
// If rd == wr, then the buffer is empty and there cannot be any gaps.
if (rd_ == wr_) {
return kGS_NoGap;
}
// If rd_ is currently pointing at an unprocessed packet, then there is no
// current gap.
CHECK(rd_ < capacity_);
if (NULL != ring_[rd_]) {
return kGS_NoGap;
}
// Looks like there must be a gap here. The start of the gap is the current
// rd sequence number, all we need to do now is determine its length in
// order to compute the end sequence number.
gap->start_seq_ = rd_seq_;
uint16_t end = rd_seq_;
uint32_t tmp = (rd_ + 1) % capacity_;
while ((tmp != wr_) && (NULL == ring_[tmp])) {
++end;
tmp = (tmp + 1) % capacity_;
}
gap->end_seq_ = end;
return kGS_NormalGap;
}
void AAH_RXPlayer::RXRingBuffer::processNAK(const SeqNoGap* nak) {
AutoMutex lock(&lock_);
CHECK(NULL != ring_);
// If we were waiting for our first fast start fill-in packet, and we
// received a NAK, then apparantly we are not getting our fast start. Just
// clear the waiting flag and go back to normal behavior.
if (waiting_for_fast_start_) {
waiting_for_fast_start_ = false;
}
// If we have not received a packet since last reset, or there is no data in
// the ring, then there is nothing to skip.
if ((!rd_seq_known_) || (rd_ == wr_)) {
return;
}
// If rd_ is currently pointing at an unprocessed packet, then there is no
// gap to skip.
CHECK(rd_ < capacity_);
if (NULL != ring_[rd_]) {
return;
}
// Looks like there must be a gap here. Advance rd until we have passed
// over the portion of it indicated by nak (or all of the gap if nak is
// NULL). Then reset fetched_first_packet_ so that the next read will show
// up as being discontiguous.
uint16_t seq_after_gap = (NULL == nak) ? 0 : nak->end_seq_ + 1;
while ((rd_ != wr_) &&
(NULL == ring_[rd_]) &&
((NULL == nak) || (seq_after_gap != rd_seq_))) {
rd_ = (rd_ + 1) % capacity_;
++rd_seq_;
}
fetched_first_packet_ = false;
}
int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() {
AutoMutex lock(&lock_);
if (!rtp_activity_timeout_valid_) {
return -1;
}
uint64_t now = monotonicUSecNow();
if (rtp_activity_timeout_ <= now) {
return 0;
}
return (rtp_activity_timeout_ - now) / 1000;
}
AAH_RXPlayer::PacketBuffer*
AAH_RXPlayer::PacketBuffer::allocate(ssize_t length) {
if (length <= 0) {
return NULL;
}
uint32_t alloc_len = sizeof(PacketBuffer) + length;
PacketBuffer* ret = reinterpret_cast<PacketBuffer*>(
new uint8_t[alloc_len]);
if (NULL != ret) {
ret->length_ = length;
}
return ret;
}
void AAH_RXPlayer::PacketBuffer::destroy(PacketBuffer* pb) {
uint8_t* kill_me = reinterpret_cast<uint8_t*>(pb);
delete[] kill_me;
}
} // namespace android
|