From 8d342970108926c4ea355c90d26a2a353ec0fd47 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Fri, 27 Aug 2010 13:29:08 -0700 Subject: Better support for rtsp (normal play-)time display. Better seek support, timeout if no packets arrive for too long. Change-Id: Id491541a6ae501604cda815f8e961a3bfe26db7d related-to-bug: 2556656 --- media/libstagefright/rtsp/AAMRAssembler.cpp | 6 +- media/libstagefright/rtsp/AAVCAssembler.cpp | 10 +- media/libstagefright/rtsp/AH263Assembler.cpp | 6 +- media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp | 6 +- .../rtsp/AMPEG4ElementaryAssembler.cpp | 6 +- media/libstagefright/rtsp/APacketSource.cpp | 52 ++++- media/libstagefright/rtsp/APacketSource.h | 14 ++ media/libstagefright/rtsp/ARTPAssembler.cpp | 27 +-- media/libstagefright/rtsp/ARTPAssembler.h | 5 +- media/libstagefright/rtsp/ARTSPController.cpp | 5 + media/libstagefright/rtsp/MyHandler.h | 214 +++++++++++++++++++-- 11 files changed, 296 insertions(+), 55 deletions(-) (limited to 'media/libstagefright/rtsp') diff --git a/media/libstagefright/rtsp/AAMRAssembler.cpp b/media/libstagefright/rtsp/AAMRAssembler.cpp index c56578b..154ba31 100644 --- a/media/libstagefright/rtsp/AAMRAssembler.cpp +++ b/media/libstagefright/rtsp/AAMRAssembler.cpp @@ -178,12 +178,8 @@ ARTPAssembler::AssemblyStatus AAMRAssembler::addPacket( } } - uint64_t ntpTime; - CHECK(buffer->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - sp accessUnit = new ABuffer(totalSize); - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, buffer); size_t dstOffset = 0; for (size_t i = 0; i < tableOfContents.size(); ++i) { diff --git a/media/libstagefright/rtsp/AAVCAssembler.cpp b/media/libstagefright/rtsp/AAVCAssembler.cpp index b22de2c..6b1e292 100644 --- a/media/libstagefright/rtsp/AAVCAssembler.cpp +++ b/media/libstagefright/rtsp/AAVCAssembler.cpp @@ -155,7 +155,7 @@ bool AAVCAssembler::addSingleTimeAggregationPacket(const sp &buffer) { sp unit = new ABuffer(nalSize); memcpy(unit->data(), &data[2], nalSize); - PropagateTimes(buffer, unit); + CopyTimes(unit, buffer); addSingleNALUnit(unit); @@ -287,7 +287,7 @@ ARTPAssembler::AssemblyStatus AAVCAssembler::addFragmentedNALUnit( ++totalSize; sp unit = new ABuffer(totalSize); - PropagateTimes(buffer, unit); + CopyTimes(unit, *queue->begin()); unit->data()[0] = (nri << 5) | nalType; @@ -325,10 +325,6 @@ void AAVCAssembler::submitAccessUnit() { LOG(VERBOSE) << "Access unit complete (" << mNALUnits.size() << " nal units)"; #endif - uint64_t ntpTime; - CHECK((*mNALUnits.begin())->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - size_t totalSize = 0; for (List >::iterator it = mNALUnits.begin(); it != mNALUnits.end(); ++it) { @@ -347,7 +343,7 @@ void AAVCAssembler::submitAccessUnit() { offset += nal->size(); } - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, *mNALUnits.begin()); #if 0 printf(mAccessUnitDamaged ? "X" : "."); diff --git a/media/libstagefright/rtsp/AH263Assembler.cpp b/media/libstagefright/rtsp/AH263Assembler.cpp index 2818041..498295c 100644 --- a/media/libstagefright/rtsp/AH263Assembler.cpp +++ b/media/libstagefright/rtsp/AH263Assembler.cpp @@ -128,10 +128,6 @@ void AH263Assembler::submitAccessUnit() { LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)"; #endif - uint64_t ntpTime; - CHECK((*mPackets.begin())->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - size_t totalSize = 0; List >::iterator it = mPackets.begin(); while (it != mPackets.end()) { @@ -155,7 +151,7 @@ void AH263Assembler::submitAccessUnit() { ++it; } - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, *mPackets.begin()); #if 0 printf(mAccessUnitDamaged ? "X" : "."); diff --git a/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp b/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp index 6e46361..b0d2c64 100644 --- a/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp +++ b/media/libstagefright/rtsp/AMPEG4AudioAssembler.cpp @@ -103,10 +103,6 @@ void AMPEG4AudioAssembler::submitAccessUnit() { LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " packets)"; #endif - uint64_t ntpTime; - CHECK((*mPackets.begin())->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - size_t totalSize = 0; List >::iterator it = mPackets.begin(); while (it != mPackets.end()) { @@ -142,7 +138,7 @@ void AMPEG4AudioAssembler::submitAccessUnit() { ++it; } - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, *mPackets.begin()); #if 0 printf(mAccessUnitDamaged ? "X" : "."); diff --git a/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp index 7e633d7..7dd3e3f 100644 --- a/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp +++ b/media/libstagefright/rtsp/AMPEG4ElementaryAssembler.cpp @@ -101,10 +101,6 @@ void AMPEG4ElementaryAssembler::submitAccessUnit() { LOG(VERBOSE) << "Access unit complete (" << mPackets.size() << " nal units)"; #endif - uint64_t ntpTime; - CHECK((*mPackets.begin())->meta()->findInt64( - "ntp-time", (int64_t *)&ntpTime)); - size_t totalSize = 0; for (List >::iterator it = mPackets.begin(); it != mPackets.end(); ++it) { @@ -120,7 +116,7 @@ void AMPEG4ElementaryAssembler::submitAccessUnit() { offset += nal->size(); } - accessUnit->meta()->setInt64("ntp-time", ntpTime); + CopyTimes(accessUnit, *mPackets.begin()); #if 0 printf(mAccessUnitDamaged ? "X" : "."); diff --git a/media/libstagefright/rtsp/APacketSource.cpp b/media/libstagefright/rtsp/APacketSource.cpp index b930184..2d7738b 100644 --- a/media/libstagefright/rtsp/APacketSource.cpp +++ b/media/libstagefright/rtsp/APacketSource.cpp @@ -402,16 +402,41 @@ sp MakeMPEG4VideoCodecSpecificData( return csd; } +static bool GetClockRate(const AString &desc, uint32_t *clockRate) { + ssize_t slashPos = desc.find("/"); + if (slashPos < 0) { + return false; + } + + const char *s = desc.c_str() + slashPos + 1; + + char *end; + unsigned long x = strtoul(s, &end, 10); + + if (end == s || (*end != '\0' && *end != '/')) { + return false; + } + + *clockRate = x; + + return true; +} + APacketSource::APacketSource( const sp &sessionDesc, size_t index) : mInitCheck(NO_INIT), mFormat(new MetaData), - mEOSResult(OK) { + mEOSResult(OK), + mRTPTimeBase(0), + mNormalPlayTimeBaseUs(0), + mLastNormalPlayTimeUs(0) { unsigned long PT; AString desc; AString params; sessionDesc->getFormatType(index, &PT, &desc, ¶ms); + CHECK(GetClockRate(desc, &mClockRate)); + int64_t durationUs; if (sessionDesc->getDurationUs(&durationUs)) { mFormat->setInt64(kKeyDuration, durationUs); @@ -571,6 +596,8 @@ status_t APacketSource::read( if (!mBuffers.empty()) { const sp buffer = *mBuffers.begin(); + updateNormalPlayTime_l(buffer); + MediaBuffer *mediaBuffer = new MediaBuffer(buffer->size()); int64_t timeUs; @@ -588,6 +615,16 @@ status_t APacketSource::read( return mEOSResult; } +void APacketSource::updateNormalPlayTime_l(const sp &buffer) { + uint32_t rtpTime; + CHECK(buffer->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + mLastNormalPlayTimeUs = + (((double)rtpTime - (double)mRTPTimeBase) / mClockRate) + * 1000000ll + + mNormalPlayTimeBaseUs; +} + void APacketSource::queueAccessUnit(const sp &buffer) { int32_t damaged; if (buffer->meta()->findInt32("damaged", &damaged) && damaged) { @@ -613,4 +650,17 @@ void APacketSource::flushQueue() { mBuffers.clear(); } +int64_t APacketSource::getNormalPlayTimeUs() { + Mutex::Autolock autoLock(mLock); + return mLastNormalPlayTimeUs; +} + +void APacketSource::setNormalPlayTimeMapping( + uint32_t rtpTime, int64_t normalPlayTimeUs) { + Mutex::Autolock autoLock(mLock); + + mRTPTimeBase = rtpTime; + mNormalPlayTimeBaseUs = normalPlayTimeUs; +} + } // namespace android diff --git a/media/libstagefright/rtsp/APacketSource.h b/media/libstagefright/rtsp/APacketSource.h index 197af3e..3833ab1 100644 --- a/media/libstagefright/rtsp/APacketSource.h +++ b/media/libstagefright/rtsp/APacketSource.h @@ -45,6 +45,11 @@ struct APacketSource : public MediaSource { void flushQueue(); + int64_t getNormalPlayTimeUs(); + + void setNormalPlayTimeMapping( + uint32_t rtpTime, int64_t normalPlayTimeUs); + protected: virtual ~APacketSource(); @@ -58,6 +63,15 @@ private: List > mBuffers; status_t mEOSResult; + uint32_t mClockRate; + + uint32_t mRTPTimeBase; + int64_t mNormalPlayTimeBaseUs; + + int64_t mLastNormalPlayTimeUs; + + void updateNormalPlayTime_l(const sp &buffer); + DISALLOW_EVIL_CONSTRUCTORS(APacketSource); }; diff --git a/media/libstagefright/rtsp/ARTPAssembler.cpp b/media/libstagefright/rtsp/ARTPAssembler.cpp index 24225b8..9ba2b37 100644 --- a/media/libstagefright/rtsp/ARTPAssembler.cpp +++ b/media/libstagefright/rtsp/ARTPAssembler.cpp @@ -35,18 +35,6 @@ ARTPAssembler::ARTPAssembler() : mFirstFailureTimeUs(-1) { } -void ARTPAssembler::PropagateTimes( - const sp &from, const sp &to) { - uint32_t rtpTime; - CHECK(from->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); - - uint64_t ntpTime = 0; - CHECK(from->meta()->findInt64("ntp-time", (int64_t *)&ntpTime)); - - to->meta()->setInt32("rtp-time", rtpTime); - to->meta()->setInt64("ntp-time", ntpTime); -} - void ARTPAssembler::onPacketReceived(const sp &source) { AssemblyStatus status; for (;;) { @@ -75,4 +63,19 @@ void ARTPAssembler::onPacketReceived(const sp &source) { } } +// static +void ARTPAssembler::CopyTimes(const sp &to, const sp &from) { + uint64_t ntpTime; + CHECK(from->meta()->findInt64("ntp-time", (int64_t *)&ntpTime)); + + uint32_t rtpTime; + CHECK(from->meta()->findInt32("rtp-time", (int32_t *)&rtpTime)); + + to->meta()->setInt64("ntp-time", ntpTime); + to->meta()->setInt32("rtp-time", rtpTime); + + // Copy the seq number. + to->setInt32Data(from->int32Data()); +} + } // namespace android diff --git a/media/libstagefright/rtsp/ARTPAssembler.h b/media/libstagefright/rtsp/ARTPAssembler.h index e598088..70ea186 100644 --- a/media/libstagefright/rtsp/ARTPAssembler.h +++ b/media/libstagefright/rtsp/ARTPAssembler.h @@ -40,12 +40,11 @@ struct ARTPAssembler : public RefBase { virtual void onByeReceived() = 0; protected: - static void PropagateTimes( - const sp &from, const sp &to); - virtual AssemblyStatus assembleMore(const sp &source) = 0; virtual void packetLost() = 0; + static void CopyTimes(const sp &to, const sp &from); + private: int64_t mFirstFailureTimeUs; diff --git a/media/libstagefright/rtsp/ARTSPController.cpp b/media/libstagefright/rtsp/ARTSPController.cpp index 9df17cb..a89946b 100644 --- a/media/libstagefright/rtsp/ARTSPController.cpp +++ b/media/libstagefright/rtsp/ARTSPController.cpp @@ -138,4 +138,9 @@ void ARTSPController::onMessageReceived(const sp &msg) { } } +int64_t ARTSPController::getNormalPlayTimeUs() { + CHECK(mHandler != NULL); + return mHandler->getNormalPlayTimeUs(); +} + } // namespace android diff --git a/media/libstagefright/rtsp/MyHandler.h b/media/libstagefright/rtsp/MyHandler.h index b10c881..0685a47 100644 --- a/media/libstagefright/rtsp/MyHandler.h +++ b/media/libstagefright/rtsp/MyHandler.h @@ -23,6 +23,8 @@ #include "ARTSPConnection.h" #include "ASessionDescription.h" +#include + #include #include #include @@ -33,6 +35,34 @@ namespace android { +static bool GetAttribute(const char *s, const char *key, AString *value) { + value->clear(); + + size_t keyLen = strlen(key); + + for (;;) { + while (isspace(*s)) { + ++s; + } + + const char *colonPos = strchr(s, ';'); + + size_t len = + (colonPos == NULL) ? strlen(s) : colonPos - s; + + if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { + value->setTo(&s[keyLen + 1], len - keyLen - 1); + return true; + } + + if (colonPos == NULL) { + return false; + } + + s = colonPos + 1; + } +} + struct MyHandler : public AHandler { MyHandler(const char *url, const sp &looper) : mLooper(looper), @@ -43,7 +73,9 @@ struct MyHandler : public AHandler { mSetupTracksSuccessful(false), mSeekPending(false), mFirstAccessUnit(true), - mFirstAccessUnitNTP(0) { + mFirstAccessUnitNTP(0), + mNumAccessUnitsReceived(0), + mCheckPending(false) { mNetLooper->start(false /* runOnCallingThread */, false /* canCallJava */, @@ -76,6 +108,20 @@ struct MyHandler : public AHandler { msg->post(); } + int64_t getNormalPlayTimeUs() { + int64_t maxTimeUs = 0; + for (size_t i = 0; i < mTracks.size(); ++i) { + int64_t timeUs = mTracks.editItemAt(i).mPacketSource + ->getNormalPlayTimeUs(); + + if (i == 0 || timeUs > maxTimeUs) { + maxTimeUs = timeUs; + } + } + + return maxTimeUs; + } + virtual void onMessageReceived(const sp &msg) { switch (msg->what()) { case 'conn': @@ -269,6 +315,8 @@ struct MyHandler : public AHandler { CHECK_EQ(response->mStatusCode, 200u); + parsePlayResponse(response); + mDoneMsg->setInt32("result", OK); mDoneMsg->post(); mDoneMsg = NULL; @@ -332,16 +380,38 @@ struct MyHandler : public AHandler { break; } + case 'chek': + { + if (mNumAccessUnitsReceived == 0) { + LOG(INFO) << "stream ended? aborting."; + (new AMessage('abor', id()))->post(); + break; + } + + mNumAccessUnitsReceived = 0; + msg->post(500000); + break; + } + case 'accu': { + ++mNumAccessUnitsReceived; + + if (!mCheckPending) { + mCheckPending = true; + sp check = new AMessage('chek', id()); + check->post(500000); + } + size_t trackIndex; CHECK(msg->findSize("track-index", &trackIndex)); + TrackInfo *track = &mTracks.editItemAt(trackIndex); + int32_t eos; if (msg->findInt32("eos", &eos)) { LOG(INFO) << "received BYE on track index " << trackIndex; #if 0 - TrackInfo *track = &mTracks.editItemAt(trackIndex); track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); #endif return; @@ -352,10 +422,32 @@ struct MyHandler : public AHandler { sp accessUnit = static_cast(obj.get()); + uint32_t seqNum = (uint32_t)accessUnit->int32Data(); + + if (seqNum < track->mFirstSeqNumInSegment) { + LOG(INFO) << "dropping stale access-unit " + << "(" << seqNum << " < " + << track->mFirstSeqNumInSegment << ")"; + break; + } + uint64_t ntpTime; CHECK(accessUnit->meta()->findInt64( "ntp-time", (int64_t *)&ntpTime)); + uint32_t rtpTime; + CHECK(accessUnit->meta()->findInt32( + "rtp-time", (int32_t *)&rtpTime)); + + if (track->mNewSegment) { + track->mNewSegment = false; + + LOG(VERBOSE) << "first segment unit ntpTime=" + << StringPrintf("0x%016llx", ntpTime) + << " rtpTime=" << rtpTime + << " seq=" << seqNum; + } + if (mFirstAccessUnit) { mFirstAccessUnit = false; mFirstAccessUnitNTP = ntpTime; @@ -414,6 +506,11 @@ struct MyHandler : public AHandler { case 'see1': { + // Session is paused now. + for (size_t i = 0; i < mTracks.size(); ++i) { + mTracks.editItemAt(i).mPacketSource->flushQueue(); + } + int64_t timeUs; CHECK(msg->findInt64("time", &timeUs)); @@ -440,15 +537,13 @@ struct MyHandler : public AHandler { { CHECK(mSeekPending); - LOG(INFO) << "seek completed."; - mSeekPending = false; - int32_t result; CHECK(msg->findInt32("result", &result)); - if (result != OK) { - LOG(ERROR) << "seek FAILED"; - break; - } + + LOG(INFO) << "PLAY completed with result " + << result << " (" << strerror(-result) << ")"; + + CHECK_EQ(result, (status_t)OK); sp obj; CHECK(msg->findObject("response", &obj)); @@ -457,9 +552,10 @@ struct MyHandler : public AHandler { CHECK_EQ(response->mStatusCode, 200u); - for (size_t i = 0; i < mTracks.size(); ++i) { - mTracks.editItemAt(i).mPacketSource->flushQueue(); - } + parsePlayResponse(response); + + LOG(INFO) << "seek completed."; + mSeekPending = false; break; } @@ -491,6 +587,90 @@ struct MyHandler : public AHandler { } } + static void SplitString( + const AString &s, const char *separator, List *items) { + items->clear(); + size_t start = 0; + while (start < s.size()) { + ssize_t offset = s.find(separator, start); + + if (offset < 0) { + items->push_back(AString(s, start, s.size() - start)); + break; + } + + items->push_back(AString(s, start, offset - start)); + start = offset + strlen(separator); + } + } + + void parsePlayResponse(const sp &response) { + ssize_t i = response->mHeaders.indexOfKey("range"); + if (i < 0) { + // Server doesn't even tell use what range it is going to + // play, therefore we won't support seeking. + return; + } + + AString range = response->mHeaders.valueAt(i); + LOG(VERBOSE) << "Range: " << range; + + AString val; + CHECK(GetAttribute(range.c_str(), "npt", &val)); + float npt1, npt2; + + if (val == "now-") { + // This is a live stream and therefore not seekable. + return; + } else { + CHECK_EQ(sscanf(val.c_str(), "%f-%f", &npt1, &npt2), 2); + } + + i = response->mHeaders.indexOfKey("rtp-info"); + CHECK_GE(i, 0); + + AString rtpInfo = response->mHeaders.valueAt(i); + List streamInfos; + SplitString(rtpInfo, ",", &streamInfos); + + int n = 1; + for (List::iterator it = streamInfos.begin(); + it != streamInfos.end(); ++it) { + (*it).trim(); + LOG(VERBOSE) << "streamInfo[" << n << "] = " << *it; + + CHECK(GetAttribute((*it).c_str(), "url", &val)); + + size_t trackIndex = 0; + while (trackIndex < mTracks.size() + && !(val == mTracks.editItemAt(trackIndex).mURL)) { + ++trackIndex; + } + CHECK_LT(trackIndex, mTracks.size()); + + CHECK(GetAttribute((*it).c_str(), "seq", &val)); + + char *end; + unsigned long seq = strtoul(val.c_str(), &end, 10); + + TrackInfo *info = &mTracks.editItemAt(trackIndex); + info->mFirstSeqNumInSegment = seq; + info->mNewSegment = true; + + CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); + + uint32_t rtpTime = strtoul(val.c_str(), &end, 10); + + LOG(VERBOSE) << "track #" << n + << ": rtpTime=" << rtpTime << " <=> npt=" << npt1; + + info->mPacketSource->setNormalPlayTimeMapping( + rtpTime, (int64_t)(npt1 * 1E6)); + + ++n; + } + } + sp getPacketSource(size_t index) { CHECK_GE(index, 0u); CHECK_LT(index, mTracks.size()); @@ -515,11 +695,16 @@ private: bool mSeekPending; bool mFirstAccessUnit; uint64_t mFirstAccessUnitNTP; + int64_t mNumAccessUnitsReceived; + bool mCheckPending; struct TrackInfo { + AString mURL; int mRTPSocket; int mRTCPSocket; bool mUsingInterleavedTCP; + uint32_t mFirstSeqNumInSegment; + bool mNewSegment; sp mPacketSource; }; @@ -549,8 +734,13 @@ private: mTracks.push(TrackInfo()); TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); + info->mURL = trackURL; info->mPacketSource = source; info->mUsingInterleavedTCP = false; + info->mFirstSeqNumInSegment = 0; + info->mNewSegment = true; + + LOG(VERBOSE) << "track #" << mTracks.size() << " URL=" << trackURL; AString request = "SETUP "; request.append(trackURL); -- cgit v1.1