summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--media/libstagefright/wifi-display/rtp/RTPReceiver.cpp272
-rw-r--r--media/libstagefright/wifi-display/rtp/RTPReceiver.h8
2 files changed, 215 insertions, 65 deletions
diff --git a/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp b/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp
index 238fb82..8fa1dae 100644
--- a/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp
+++ b/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp
@@ -30,11 +30,13 @@
#include <media/stagefright/MediaErrors.h>
#include <media/stagefright/Utils.h>
+#define TRACK_PACKET_LOSS 0
+
namespace android {
////////////////////////////////////////////////////////////////////////////////
-struct RTPReceiver::Source : public RefBase {
+struct RTPReceiver::Source : public AHandler {
Source(RTPReceiver *receiver, uint32_t ssrc);
void onPacketReceived(uint16_t seq, const sp<ABuffer> &buffer);
@@ -44,7 +46,14 @@ struct RTPReceiver::Source : public RefBase {
protected:
virtual ~Source();
+ virtual void onMessageReceived(const sp<AMessage> &msg);
+
private:
+ enum {
+ kWhatRetransmit,
+ kWhatDeclareLost,
+ };
+
static const uint32_t kMinSequential = 2;
static const uint32_t kMaxDropout = 3000;
static const uint32_t kMaxMisorder = 100;
@@ -67,6 +76,17 @@ private:
// Ordered by extended seq number.
List<sp<ABuffer> > mPackets;
+ enum StatusBits {
+ STATUS_DECLARED_LOST = 1,
+ STATUS_REQUESTED_RETRANSMISSION = 2,
+ STATUS_ARRIVED_LATE = 4,
+ };
+#if TRACK_PACKET_LOSS
+ KeyedVector<int32_t, uint32_t> mLostPackets;
+#endif
+
+ void modifyPacketStatus(int32_t extSeqNo, uint32_t mask);
+
int32_t mAwaitingExtSeqNo;
bool mRequestedRetransmission;
@@ -78,12 +98,20 @@ private:
int32_t mNumDeclaredLost;
int32_t mNumDeclaredLostPrior;
+ int32_t mRetransmitGeneration;
+ int32_t mDeclareLostGeneration;
+ bool mDeclareLostTimerPending;
+
void queuePacket(const sp<ABuffer> &packet);
void dequeueMore();
sp<ABuffer> getNextPacket();
void resync();
+ void postRetransmitTimer(int64_t delayUs);
+ void postDeclareLostTimer(int64_t delayUs);
+ void cancelTimers();
+
DISALLOW_EVIL_CONSTRUCTORS(Source);
};
@@ -106,12 +134,71 @@ RTPReceiver::Source::Source(RTPReceiver *receiver, uint32_t ssrc)
mActivePacketType(-1),
mNextReportTimeUs(-1ll),
mNumDeclaredLost(0),
- mNumDeclaredLostPrior(0) {
+ mNumDeclaredLostPrior(0),
+ mRetransmitGeneration(0),
+ mDeclareLostGeneration(0),
+ mDeclareLostTimerPending(false) {
}
RTPReceiver::Source::~Source() {
}
+void RTPReceiver::Source::onMessageReceived(const sp<AMessage> &msg) {
+ switch (msg->what()) {
+ case kWhatRetransmit:
+ {
+ int32_t generation;
+ CHECK(msg->findInt32("generation", &generation));
+
+ if (generation != mRetransmitGeneration) {
+ break;
+ }
+
+ mRequestedRetransmission = true;
+ mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo);
+
+ modifyPacketStatus(
+ mAwaitingExtSeqNo, STATUS_REQUESTED_RETRANSMISSION);
+ break;
+ }
+
+ case kWhatDeclareLost:
+ {
+ int32_t generation;
+ CHECK(msg->findInt32("generation", &generation));
+
+ if (generation != mDeclareLostGeneration) {
+ break;
+ }
+
+ cancelTimers();
+
+ ALOGV("Lost packet extSeqNo %d %s",
+ mAwaitingExtSeqNo,
+ mRequestedRetransmission ? "*" : "");
+
+ mRequestedRetransmission = false;
+ if (mActiveAssembler != NULL) {
+ mActiveAssembler->signalDiscontinuity();
+ }
+
+ modifyPacketStatus(mAwaitingExtSeqNo, STATUS_DECLARED_LOST);
+
+ // resync();
+ ++mAwaitingExtSeqNo;
+ ++mNumDeclaredLost;
+
+ mReceiver->notifyPacketLost();
+
+ dequeueMore();
+ break;
+ }
+
+ default:
+ TRESPASS();
+ }
+}
+
void RTPReceiver::Source::onPacketReceived(
uint16_t seq, const sp<ABuffer> &buffer) {
if (mFirst) {
@@ -164,6 +251,8 @@ void RTPReceiver::Source::queuePacket(const sp<ABuffer> &packet) {
if (mAwaitingExtSeqNo >= 0 && newExtendedSeqNo < mAwaitingExtSeqNo) {
// We're no longer interested in these. They're old.
ALOGV("dropping stale extSeqNo %d", newExtendedSeqNo);
+
+ modifyPacketStatus(newExtendedSeqNo, STATUS_ARRIVED_LATE);
return;
}
@@ -230,85 +319,89 @@ void RTPReceiver::Source::dequeueMore() {
}
mNextReportTimeUs = nowUs + kReportIntervalUs;
- }
- for (;;) {
- sp<ABuffer> packet = getNextPacket();
+#if TRACK_PACKET_LOSS
+ for (size_t i = 0; i < mLostPackets.size(); ++i) {
+ int32_t key = mLostPackets.keyAt(i);
+ uint32_t value = mLostPackets.valueAt(i);
- if (packet == NULL) {
- if (mPackets.empty()) {
- break;
+ AString status;
+ if (value & STATUS_REQUESTED_RETRANSMISSION) {
+ status.append("retrans ");
+ }
+ if (value & STATUS_ARRIVED_LATE) {
+ status.append("arrived-late ");
}
+ ALOGI("Packet %d declared lost %s", key, status.c_str());
+ }
+#endif
+ }
+
+ sp<ABuffer> packet;
+ while ((packet = getNextPacket()) != NULL) {
+ if (mDeclareLostTimerPending) {
+ cancelTimers();
+ }
+
+ CHECK_GE(mAwaitingExtSeqNo, 0);
+#if TRACK_PACKET_LOSS
+ mLostPackets.removeItem(mAwaitingExtSeqNo);
+#endif
- CHECK_GE(mAwaitingExtSeqNo, 0);
+ int32_t packetType;
+ CHECK(packet->meta()->findInt32("PT", &packetType));
- const sp<ABuffer> &firstPacket = *mPackets.begin();
+ if (packetType != mActivePacketType) {
+ mActiveAssembler = mReceiver->makeAssembler(packetType);
+ mActivePacketType = packetType;
+ }
- uint32_t rtpTime;
- CHECK(firstPacket->meta()->findInt32(
- "rtp-time", (int32_t *)&rtpTime));
+ if (mActiveAssembler != NULL) {
+ status_t err = mActiveAssembler->processPacket(packet);
+ if (err != OK) {
+ ALOGV("assembler returned error %d", err);
+ }
+ }
+ ++mAwaitingExtSeqNo;
+ }
- int64_t rtpUs = (rtpTime * 100ll) / 9ll;
+ if (mDeclareLostTimerPending) {
+ return;
+ }
- int64_t maxArrivalTimeUs =
- mFirstArrivalTimeUs + rtpUs - mFirstRTPTimeUs;
+ if (mPackets.empty()) {
+ return;
+ }
- int64_t nowUs = ALooper::GetNowUs();
+ CHECK_GE(mAwaitingExtSeqNo, 0);
- CHECK_LT(mAwaitingExtSeqNo, firstPacket->int32Data());
+ const sp<ABuffer> &firstPacket = *mPackets.begin();
- ALOGV("waiting for %d, comparing against %d, %lld us left",
- mAwaitingExtSeqNo,
- firstPacket->int32Data(),
- maxArrivalTimeUs - nowUs);
+ uint32_t rtpTime;
+ CHECK(firstPacket->meta()->findInt32(
+ "rtp-time", (int32_t *)&rtpTime));
- if (maxArrivalTimeUs + kPacketLostAfterUs <= nowUs) {
- ALOGV("Lost packet extSeqNo %d %s",
- mAwaitingExtSeqNo,
- mRequestedRetransmission ? "*" : "");
- mRequestedRetransmission = false;
- if (mActiveAssembler != NULL) {
- mActiveAssembler->signalDiscontinuity();
- }
+ int64_t rtpUs = (rtpTime * 100ll) / 9ll;
- // resync();
- ++mAwaitingExtSeqNo;
- ++mNumDeclaredLost;
-
- mReceiver->notifyPacketLost();
- continue;
- } else if (kRequestRetransmissionAfterUs > 0
- && maxArrivalTimeUs + kRequestRetransmissionAfterUs <= nowUs
- && !mRequestedRetransmission
- && mAwaitingExtSeqNo >= 0) {
- mRequestedRetransmission = true;
- mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo);
- break;
- } else {
- break;
- }
- }
+ int64_t maxArrivalTimeUs =
+ mFirstArrivalTimeUs + rtpUs - mFirstRTPTimeUs;
- mRequestedRetransmission = false;
+ nowUs = ALooper::GetNowUs();
- int32_t packetType;
- CHECK(packet->meta()->findInt32("PT", &packetType));
+ CHECK_LT(mAwaitingExtSeqNo, firstPacket->int32Data());
- if (packetType != mActivePacketType) {
- mActiveAssembler = mReceiver->makeAssembler(packetType);
- mActivePacketType = packetType;
- }
+ ALOGV("waiting for %d, comparing against %d, %lld us left",
+ mAwaitingExtSeqNo,
+ firstPacket->int32Data(),
+ maxArrivalTimeUs - nowUs);
- if (mActiveAssembler == NULL) {
- continue;
- }
+ postDeclareLostTimer(maxArrivalTimeUs + kPacketLostAfterUs);
- status_t err = mActiveAssembler->processPacket(packet);
- if (err != OK) {
- ALOGV("assembler returned error %d", err);
- }
+ if (kRequestRetransmissionAfterUs > 0ll) {
+ postRetransmitTimer(
+ maxArrivalTimeUs + kRequestRetransmissionAfterUs);
}
}
@@ -328,8 +421,6 @@ sp<ABuffer> RTPReceiver::Source::getNextPacket() {
sp<ABuffer> packet = *mPackets.begin();
mPackets.erase(mPackets.begin());
- ++mAwaitingExtSeqNo;
-
return packet;
}
@@ -404,9 +495,11 @@ void RTPReceiver::Source::addReportBlock(
RTPReceiver::RTPReceiver(
const sp<ANetworkSession> &netSession,
- const sp<AMessage> &notify)
+ const sp<AMessage> &notify,
+ uint32_t flags)
: mNetSession(netSession),
mNotify(notify),
+ mFlags(flags),
mRTPMode(TRANSPORT_UNDEFINED),
mRTCPMode(TRANSPORT_UNDEFINED),
mRTPSessionID(0),
@@ -693,6 +786,20 @@ void RTPReceiver::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
CHECK(msg->findBuffer("data", &data));
if (isRTP) {
+ if (mFlags & FLAG_AUTO_CONNECT) {
+ AString fromAddr;
+ CHECK(msg->findString("fromAddr", &fromAddr));
+
+ int32_t fromPort;
+ CHECK(msg->findInt32("fromPort", &fromPort));
+
+ CHECK_EQ((status_t)OK,
+ connect(
+ fromAddr.c_str(), fromPort, fromPort + 1));
+
+ mFlags &= ~FLAG_AUTO_CONNECT;
+ }
+
onRTPData(data);
} else {
onRTCPData(data);
@@ -835,6 +942,8 @@ status_t RTPReceiver::onRTPData(const sp<ABuffer> &buffer) {
sp<Source> source;
if (index < 0) {
source = new Source(this, srcId);
+ looper()->registerHandler(source);
+
mSources.add(srcId, source);
} else {
source = mSources.valueAt(index);
@@ -965,6 +1074,7 @@ sp<RTPReceiver::Assembler> RTPReceiver::makeAssembler(uint8_t packetType) {
PacketizationMode mode = mPacketTypes.valueAt(index);
switch (mode) {
+ case PACKETIZATION_NONE:
case PACKETIZATION_TRANSPORT_STREAM:
return new TSAssembler(mNotify);
@@ -1005,5 +1115,39 @@ void RTPReceiver::requestRetransmission(uint32_t senderSSRC, int32_t extSeqNo) {
mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size());
}
+void RTPReceiver::Source::modifyPacketStatus(int32_t extSeqNo, uint32_t mask) {
+#if TRACK_PACKET_LOSS
+ ssize_t index = mLostPackets.indexOfKey(extSeqNo);
+ if (index < 0) {
+ mLostPackets.add(extSeqNo, mask);
+ } else {
+ mLostPackets.editValueAt(index) |= mask;
+ }
+#endif
+}
+
+void RTPReceiver::Source::postRetransmitTimer(int64_t timeUs) {
+ int64_t delayUs = timeUs - ALooper::GetNowUs();
+ sp<AMessage> msg = new AMessage(kWhatRetransmit, id());
+ msg->setInt32("generation", mRetransmitGeneration);
+ msg->post(delayUs);
+}
+
+void RTPReceiver::Source::postDeclareLostTimer(int64_t timeUs) {
+ CHECK(!mDeclareLostTimerPending);
+ mDeclareLostTimerPending = true;
+
+ int64_t delayUs = timeUs - ALooper::GetNowUs();
+ sp<AMessage> msg = new AMessage(kWhatDeclareLost, id());
+ msg->setInt32("generation", mDeclareLostGeneration);
+ msg->post(delayUs);
+}
+
+void RTPReceiver::Source::cancelTimers() {
+ ++mRetransmitGeneration;
+ ++mDeclareLostGeneration;
+ mDeclareLostTimerPending = false;
+}
+
} // namespace android
diff --git a/media/libstagefright/wifi-display/rtp/RTPReceiver.h b/media/libstagefright/wifi-display/rtp/RTPReceiver.h
index 630bce9..240ab2e 100644
--- a/media/libstagefright/wifi-display/rtp/RTPReceiver.h
+++ b/media/libstagefright/wifi-display/rtp/RTPReceiver.h
@@ -39,9 +39,14 @@ struct RTPReceiver : public RTPBase, public AHandler {
kWhatAccessUnit,
kWhatPacketLost,
};
+
+ enum Flags {
+ FLAG_AUTO_CONNECT = 1,
+ };
RTPReceiver(
const sp<ANetworkSession> &netSession,
- const sp<AMessage> &notify);
+ const sp<AMessage> &notify,
+ uint32_t flags = 0);
status_t registerPacketType(
uint8_t packetType, PacketizationMode mode);
@@ -82,6 +87,7 @@ private:
sp<ANetworkSession> mNetSession;
sp<AMessage> mNotify;
+ uint32_t mFlags;
TransportMode mRTPMode;
TransportMode mRTCPMode;
int32_t mRTPSessionID;