From 2be6121a47d3df2a0efcb73afd214f2958eb9927 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Thu, 4 Apr 2013 11:17:05 -0700 Subject: RTPReceiver can now track packet loss, account for late arrivals it also uses timers to trigger retransmission and packet loss declaration Change-Id: If1f9324783b3bef950076c2edf321f7c33ff9fea --- .../wifi-display/rtp/RTPReceiver.cpp | 272 ++++++++++++++++----- .../libstagefright/wifi-display/rtp/RTPReceiver.h | 8 +- 2 files changed, 215 insertions(+), 65 deletions(-) (limited to 'media') diff --git a/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp b/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp index 238fb82..8fa1dae 100644 --- a/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp +++ b/media/libstagefright/wifi-display/rtp/RTPReceiver.cpp @@ -30,11 +30,13 @@ #include #include +#define TRACK_PACKET_LOSS 0 + namespace android { //////////////////////////////////////////////////////////////////////////////// -struct RTPReceiver::Source : public RefBase { +struct RTPReceiver::Source : public AHandler { Source(RTPReceiver *receiver, uint32_t ssrc); void onPacketReceived(uint16_t seq, const sp &buffer); @@ -44,7 +46,14 @@ struct RTPReceiver::Source : public RefBase { protected: virtual ~Source(); + virtual void onMessageReceived(const sp &msg); + private: + enum { + kWhatRetransmit, + kWhatDeclareLost, + }; + static const uint32_t kMinSequential = 2; static const uint32_t kMaxDropout = 3000; static const uint32_t kMaxMisorder = 100; @@ -67,6 +76,17 @@ private: // Ordered by extended seq number. List > mPackets; + enum StatusBits { + STATUS_DECLARED_LOST = 1, + STATUS_REQUESTED_RETRANSMISSION = 2, + STATUS_ARRIVED_LATE = 4, + }; +#if TRACK_PACKET_LOSS + KeyedVector mLostPackets; +#endif + + void modifyPacketStatus(int32_t extSeqNo, uint32_t mask); + int32_t mAwaitingExtSeqNo; bool mRequestedRetransmission; @@ -78,12 +98,20 @@ private: int32_t mNumDeclaredLost; int32_t mNumDeclaredLostPrior; + int32_t mRetransmitGeneration; + int32_t mDeclareLostGeneration; + bool mDeclareLostTimerPending; + void queuePacket(const sp &packet); void dequeueMore(); sp getNextPacket(); void resync(); + void postRetransmitTimer(int64_t delayUs); + void postDeclareLostTimer(int64_t delayUs); + void cancelTimers(); + DISALLOW_EVIL_CONSTRUCTORS(Source); }; @@ -106,12 +134,71 @@ RTPReceiver::Source::Source(RTPReceiver *receiver, uint32_t ssrc) mActivePacketType(-1), mNextReportTimeUs(-1ll), mNumDeclaredLost(0), - mNumDeclaredLostPrior(0) { + mNumDeclaredLostPrior(0), + mRetransmitGeneration(0), + mDeclareLostGeneration(0), + mDeclareLostTimerPending(false) { } RTPReceiver::Source::~Source() { } +void RTPReceiver::Source::onMessageReceived(const sp &msg) { + switch (msg->what()) { + case kWhatRetransmit: + { + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + + if (generation != mRetransmitGeneration) { + break; + } + + mRequestedRetransmission = true; + mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo); + + modifyPacketStatus( + mAwaitingExtSeqNo, STATUS_REQUESTED_RETRANSMISSION); + break; + } + + case kWhatDeclareLost: + { + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + + if (generation != mDeclareLostGeneration) { + break; + } + + cancelTimers(); + + ALOGV("Lost packet extSeqNo %d %s", + mAwaitingExtSeqNo, + mRequestedRetransmission ? "*" : ""); + + mRequestedRetransmission = false; + if (mActiveAssembler != NULL) { + mActiveAssembler->signalDiscontinuity(); + } + + modifyPacketStatus(mAwaitingExtSeqNo, STATUS_DECLARED_LOST); + + // resync(); + ++mAwaitingExtSeqNo; + ++mNumDeclaredLost; + + mReceiver->notifyPacketLost(); + + dequeueMore(); + break; + } + + default: + TRESPASS(); + } +} + void RTPReceiver::Source::onPacketReceived( uint16_t seq, const sp &buffer) { if (mFirst) { @@ -164,6 +251,8 @@ void RTPReceiver::Source::queuePacket(const sp &packet) { if (mAwaitingExtSeqNo >= 0 && newExtendedSeqNo < mAwaitingExtSeqNo) { // We're no longer interested in these. They're old. ALOGV("dropping stale extSeqNo %d", newExtendedSeqNo); + + modifyPacketStatus(newExtendedSeqNo, STATUS_ARRIVED_LATE); return; } @@ -230,85 +319,89 @@ void RTPReceiver::Source::dequeueMore() { } mNextReportTimeUs = nowUs + kReportIntervalUs; - } - for (;;) { - sp packet = getNextPacket(); +#if TRACK_PACKET_LOSS + for (size_t i = 0; i < mLostPackets.size(); ++i) { + int32_t key = mLostPackets.keyAt(i); + uint32_t value = mLostPackets.valueAt(i); - if (packet == NULL) { - if (mPackets.empty()) { - break; + AString status; + if (value & STATUS_REQUESTED_RETRANSMISSION) { + status.append("retrans "); + } + if (value & STATUS_ARRIVED_LATE) { + status.append("arrived-late "); } + ALOGI("Packet %d declared lost %s", key, status.c_str()); + } +#endif + } + + sp packet; + while ((packet = getNextPacket()) != NULL) { + if (mDeclareLostTimerPending) { + cancelTimers(); + } + + CHECK_GE(mAwaitingExtSeqNo, 0); +#if TRACK_PACKET_LOSS + mLostPackets.removeItem(mAwaitingExtSeqNo); +#endif - CHECK_GE(mAwaitingExtSeqNo, 0); + int32_t packetType; + CHECK(packet->meta()->findInt32("PT", &packetType)); - const sp &firstPacket = *mPackets.begin(); + if (packetType != mActivePacketType) { + mActiveAssembler = mReceiver->makeAssembler(packetType); + mActivePacketType = packetType; + } - uint32_t rtpTime; - CHECK(firstPacket->meta()->findInt32( - "rtp-time", (int32_t *)&rtpTime)); + if (mActiveAssembler != NULL) { + status_t err = mActiveAssembler->processPacket(packet); + if (err != OK) { + ALOGV("assembler returned error %d", err); + } + } + ++mAwaitingExtSeqNo; + } - int64_t rtpUs = (rtpTime * 100ll) / 9ll; + if (mDeclareLostTimerPending) { + return; + } - int64_t maxArrivalTimeUs = - mFirstArrivalTimeUs + rtpUs - mFirstRTPTimeUs; + if (mPackets.empty()) { + return; + } - int64_t nowUs = ALooper::GetNowUs(); + CHECK_GE(mAwaitingExtSeqNo, 0); - CHECK_LT(mAwaitingExtSeqNo, firstPacket->int32Data()); + const sp &firstPacket = *mPackets.begin(); - ALOGV("waiting for %d, comparing against %d, %lld us left", - mAwaitingExtSeqNo, - firstPacket->int32Data(), - maxArrivalTimeUs - nowUs); + uint32_t rtpTime; + CHECK(firstPacket->meta()->findInt32( + "rtp-time", (int32_t *)&rtpTime)); - if (maxArrivalTimeUs + kPacketLostAfterUs <= nowUs) { - ALOGV("Lost packet extSeqNo %d %s", - mAwaitingExtSeqNo, - mRequestedRetransmission ? "*" : ""); - mRequestedRetransmission = false; - if (mActiveAssembler != NULL) { - mActiveAssembler->signalDiscontinuity(); - } + int64_t rtpUs = (rtpTime * 100ll) / 9ll; - // resync(); - ++mAwaitingExtSeqNo; - ++mNumDeclaredLost; - - mReceiver->notifyPacketLost(); - continue; - } else if (kRequestRetransmissionAfterUs > 0 - && maxArrivalTimeUs + kRequestRetransmissionAfterUs <= nowUs - && !mRequestedRetransmission - && mAwaitingExtSeqNo >= 0) { - mRequestedRetransmission = true; - mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo); - break; - } else { - break; - } - } + int64_t maxArrivalTimeUs = + mFirstArrivalTimeUs + rtpUs - mFirstRTPTimeUs; - mRequestedRetransmission = false; + nowUs = ALooper::GetNowUs(); - int32_t packetType; - CHECK(packet->meta()->findInt32("PT", &packetType)); + CHECK_LT(mAwaitingExtSeqNo, firstPacket->int32Data()); - if (packetType != mActivePacketType) { - mActiveAssembler = mReceiver->makeAssembler(packetType); - mActivePacketType = packetType; - } + ALOGV("waiting for %d, comparing against %d, %lld us left", + mAwaitingExtSeqNo, + firstPacket->int32Data(), + maxArrivalTimeUs - nowUs); - if (mActiveAssembler == NULL) { - continue; - } + postDeclareLostTimer(maxArrivalTimeUs + kPacketLostAfterUs); - status_t err = mActiveAssembler->processPacket(packet); - if (err != OK) { - ALOGV("assembler returned error %d", err); - } + if (kRequestRetransmissionAfterUs > 0ll) { + postRetransmitTimer( + maxArrivalTimeUs + kRequestRetransmissionAfterUs); } } @@ -328,8 +421,6 @@ sp RTPReceiver::Source::getNextPacket() { sp packet = *mPackets.begin(); mPackets.erase(mPackets.begin()); - ++mAwaitingExtSeqNo; - return packet; } @@ -404,9 +495,11 @@ void RTPReceiver::Source::addReportBlock( RTPReceiver::RTPReceiver( const sp &netSession, - const sp ¬ify) + const sp ¬ify, + uint32_t flags) : mNetSession(netSession), mNotify(notify), + mFlags(flags), mRTPMode(TRANSPORT_UNDEFINED), mRTCPMode(TRANSPORT_UNDEFINED), mRTPSessionID(0), @@ -693,6 +786,20 @@ void RTPReceiver::onNetNotify(bool isRTP, const sp &msg) { CHECK(msg->findBuffer("data", &data)); if (isRTP) { + if (mFlags & FLAG_AUTO_CONNECT) { + AString fromAddr; + CHECK(msg->findString("fromAddr", &fromAddr)); + + int32_t fromPort; + CHECK(msg->findInt32("fromPort", &fromPort)); + + CHECK_EQ((status_t)OK, + connect( + fromAddr.c_str(), fromPort, fromPort + 1)); + + mFlags &= ~FLAG_AUTO_CONNECT; + } + onRTPData(data); } else { onRTCPData(data); @@ -835,6 +942,8 @@ status_t RTPReceiver::onRTPData(const sp &buffer) { sp source; if (index < 0) { source = new Source(this, srcId); + looper()->registerHandler(source); + mSources.add(srcId, source); } else { source = mSources.valueAt(index); @@ -965,6 +1074,7 @@ sp RTPReceiver::makeAssembler(uint8_t packetType) { PacketizationMode mode = mPacketTypes.valueAt(index); switch (mode) { + case PACKETIZATION_NONE: case PACKETIZATION_TRANSPORT_STREAM: return new TSAssembler(mNotify); @@ -1005,5 +1115,39 @@ void RTPReceiver::requestRetransmission(uint32_t senderSSRC, int32_t extSeqNo) { mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size()); } +void RTPReceiver::Source::modifyPacketStatus(int32_t extSeqNo, uint32_t mask) { +#if TRACK_PACKET_LOSS + ssize_t index = mLostPackets.indexOfKey(extSeqNo); + if (index < 0) { + mLostPackets.add(extSeqNo, mask); + } else { + mLostPackets.editValueAt(index) |= mask; + } +#endif +} + +void RTPReceiver::Source::postRetransmitTimer(int64_t timeUs) { + int64_t delayUs = timeUs - ALooper::GetNowUs(); + sp msg = new AMessage(kWhatRetransmit, id()); + msg->setInt32("generation", mRetransmitGeneration); + msg->post(delayUs); +} + +void RTPReceiver::Source::postDeclareLostTimer(int64_t timeUs) { + CHECK(!mDeclareLostTimerPending); + mDeclareLostTimerPending = true; + + int64_t delayUs = timeUs - ALooper::GetNowUs(); + sp msg = new AMessage(kWhatDeclareLost, id()); + msg->setInt32("generation", mDeclareLostGeneration); + msg->post(delayUs); +} + +void RTPReceiver::Source::cancelTimers() { + ++mRetransmitGeneration; + ++mDeclareLostGeneration; + mDeclareLostTimerPending = false; +} + } // namespace android diff --git a/media/libstagefright/wifi-display/rtp/RTPReceiver.h b/media/libstagefright/wifi-display/rtp/RTPReceiver.h index 630bce9..240ab2e 100644 --- a/media/libstagefright/wifi-display/rtp/RTPReceiver.h +++ b/media/libstagefright/wifi-display/rtp/RTPReceiver.h @@ -39,9 +39,14 @@ struct RTPReceiver : public RTPBase, public AHandler { kWhatAccessUnit, kWhatPacketLost, }; + + enum Flags { + FLAG_AUTO_CONNECT = 1, + }; RTPReceiver( const sp &netSession, - const sp ¬ify); + const sp ¬ify, + uint32_t flags = 0); status_t registerPacketType( uint8_t packetType, PacketizationMode mode); @@ -82,6 +87,7 @@ private: sp mNetSession; sp mNotify; + uint32_t mFlags; TransportMode mRTPMode; TransportMode mRTCPMode; int32_t mRTPSessionID; -- cgit v1.1