diff options
Diffstat (limited to 'media/libstagefright/httplive')
-rw-r--r-- | media/libstagefright/httplive/Android.mk | 2 | ||||
-rw-r--r-- | media/libstagefright/httplive/LiveSession.cpp | 324 | ||||
-rw-r--r-- | media/libstagefright/httplive/LiveSession.h | 34 | ||||
-rw-r--r-- | media/libstagefright/httplive/M3UParser.cpp | 123 | ||||
-rw-r--r-- | media/libstagefright/httplive/M3UParser.h | 7 | ||||
-rw-r--r-- | media/libstagefright/httplive/PlaylistFetcher.cpp | 379 | ||||
-rw-r--r-- | media/libstagefright/httplive/PlaylistFetcher.h | 24 |
7 files changed, 659 insertions, 234 deletions
diff --git a/media/libstagefright/httplive/Android.mk b/media/libstagefright/httplive/Android.mk index f3529f9..e8d558c 100644 --- a/media/libstagefright/httplive/Android.mk +++ b/media/libstagefright/httplive/Android.mk @@ -13,6 +13,8 @@ LOCAL_C_INCLUDES:= \ $(TOP)/frameworks/native/include/media/openmax \ $(TOP)/external/openssl/include +LOCAL_CFLAGS += -Werror + LOCAL_SHARED_LIBRARIES := \ libbinder \ libcrypto \ diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index 6d48ab7..7b18348 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -27,6 +27,8 @@ #include "mpeg2ts/AnotherPacketSource.h" #include <cutils/properties.h> +#include <media/IMediaHTTPConnection.h> +#include <media/IMediaHTTPService.h> #include <media/stagefright/foundation/hexdump.h> #include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ADebug.h> @@ -34,6 +36,7 @@ #include <media/stagefright/DataSource.h> #include <media/stagefright/FileSource.h> #include <media/stagefright/MediaErrors.h> +#include <media/stagefright/MediaHTTP.h> #include <media/stagefright/MetaData.h> #include <media/stagefright/Utils.h> @@ -47,18 +50,14 @@ namespace android { LiveSession::LiveSession( - const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) + const sp<AMessage> ¬ify, uint32_t flags, + const sp<IMediaHTTPService> &httpService) : mNotify(notify), mFlags(flags), - mUIDValid(uidValid), - mUID(uid), + mHTTPService(httpService), mInPreparationPhase(true), - mHTTPDataSource( - HTTPBase::Create( - (mFlags & kFlagIncognito) - ? HTTPBase::kFlagIncognito - : 0)), - mPrevBandwidthIndex(-1), + mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), + mCurBandwidthIndex(-1), mStreamMask(0), mNewStreamMask(0), mSwapMask(0), @@ -69,16 +68,17 @@ LiveSession::LiveSession( mReconfigurationInProgress(false), mSwitchInProgress(false), mDisconnectReplyID(0), - mSeekReplyID(0) { - if (mUIDValid) { - mHTTPDataSource->setUID(mUID); - } + mSeekReplyID(0), + mFirstTimeUsValid(false), + mFirstTimeUs(0), + mLastSeekTimeUs(0) { mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); for (size_t i = 0; i < kMaxStreams; ++i) { + mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); } @@ -113,31 +113,65 @@ status_t LiveSession::dequeueAccessUnit( return -EWOULDBLOCK; } + status_t finalResult; + sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); + if (discontinuityQueue->hasBufferAvailable(&finalResult)) { + discontinuityQueue->dequeueAccessUnit(accessUnit); + // seeking, track switching + sp<AMessage> extra; + int64_t timeUs; + if ((*accessUnit)->meta()->findMessage("extra", &extra) + && extra != NULL + && extra->findInt64("timeUs", &timeUs)) { + // seeking only + mLastSeekTimeUs = timeUs; + mDiscontinuityOffsetTimesUs.clear(); + mDiscontinuityAbsStartTimesUs.clear(); + } + return INFO_DISCONTINUITY; + } + sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); - status_t finalResult; if (!packetSource->hasBufferAvailable(&finalResult)) { return finalResult == OK ? -EAGAIN : finalResult; } + // wait for counterpart + sp<AnotherPacketSource> otherSource; + if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) { + otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); + } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) { + otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); + } + if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { + return finalResult == OK ? -EAGAIN : finalResult; + } + status_t err = packetSource->dequeueAccessUnit(accessUnit); + size_t streamIdx; const char *streamStr; switch (stream) { case STREAMTYPE_AUDIO: + streamIdx = kAudioIndex; streamStr = "audio"; break; case STREAMTYPE_VIDEO: + streamIdx = kVideoIndex; streamStr = "video"; break; case STREAMTYPE_SUBTITLES: + streamIdx = kSubtitleIndex; streamStr = "subs"; break; default: TRESPASS(); } + StreamItem& strm = mStreams[streamIdx]; if (err == INFO_DISCONTINUITY) { + // adaptive streaming, discontinuities in the playlist int32_t type; CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); @@ -152,10 +186,7 @@ status_t LiveSession::dequeueAccessUnit( extra == NULL ? "NULL" : extra->debugString().c_str()); int32_t swap; - if (type == ATSParser::DISCONTINUITY_FORMATCHANGE - && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) - && swap) { - + if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { int32_t switchGeneration; CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); { @@ -168,13 +199,67 @@ status_t LiveSession::dequeueAccessUnit( msg->post(); } } + } else { + size_t seq = strm.mCurDiscontinuitySeq; + int64_t offsetTimeUs; + if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { + offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); + } else { + offsetTimeUs = 0; + } + + seq += 1; + if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + int64_t firstTimeUs; + firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); + offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; + offsetTimeUs += strm.mLastSampleDurationUs; + } else { + offsetTimeUs += strm.mLastSampleDurationUs; + } + + mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); } } else if (err == OK) { + if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { int64_t timeUs; + int32_t discontinuitySeq = 0; CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); - ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); + (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); + strm.mCurDiscontinuitySeq = discontinuitySeq; + + int32_t discard = 0; + int64_t firstTimeUs; + if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + int64_t durUs; // approximate sample duration + if (timeUs > strm.mLastDequeuedTimeUs) { + durUs = timeUs - strm.mLastDequeuedTimeUs; + } else { + durUs = strm.mLastDequeuedTimeUs - timeUs; + } + strm.mLastSampleDurationUs = durUs; + firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); + } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { + firstTimeUs = timeUs; + } else { + mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); + firstTimeUs = timeUs; + } + + strm.mLastDequeuedTimeUs = timeUs; + if (timeUs >= firstTimeUs) { + timeUs -= firstTimeUs; + } else { + timeUs = 0; + } + timeUs += mLastSeekTimeUs; + if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { + timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); + } + ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); + (*accessUnit)->meta()->setInt64("timeUs", timeUs); mLastDequeuedTimeUs = timeUs; mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; } else if (stream == STREAMTYPE_SUBTITLES) { @@ -293,7 +378,9 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } - tryToFinishBandwidthSwitch(); + if (mSwitchInProgress) { + tryToFinishBandwidthSwitch(); + } } if (mContinuation != NULL) { @@ -481,11 +568,8 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { headers = NULL; } -#if 1 - ALOGI("onConnect <URL suppressed>"); -#else - ALOGI("onConnect %s", url.c_str()); -#endif + // TODO currently we don't know if we are coming here from incognito mode + ALOGI("onConnect %s", uriDebugString(url).c_str()); mMasterURL = url; @@ -493,7 +577,7 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); if (mPlaylist == NULL) { - ALOGE("unable to fetch master playlist <URL suppressed>."); + ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); postPrepared(ERROR_IO); return; @@ -545,8 +629,9 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { mBandwidthItems.push(item); } + mPlaylist->pickRandomMediaItems(); changeConfiguration( - 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); + 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); } void LiveSession::finishDisconnect() { @@ -680,7 +765,7 @@ ssize_t LiveSession::fetchFile( ssize_t bytesRead = 0; // adjust range_length if only reading partial block - if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) { + if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { range_length = buffer->size() + block_size; } for (;;) { @@ -854,20 +939,20 @@ size_t LiveSession::getBandwidthIndex() { // to lowest) const size_t kMinIndex = 0; - static ssize_t mPrevBandwidthIndex = -1; + static ssize_t mCurBandwidthIndex = -1; size_t index; - if (mPrevBandwidthIndex < 0) { + if (mCurBandwidthIndex < 0) { index = kMinIndex; } else if (uniformRand() < 0.5) { - index = (size_t)mPrevBandwidthIndex; + index = (size_t)mCurBandwidthIndex; } else { - index = mPrevBandwidthIndex + 1; + index = mCurBandwidthIndex + 1; if (index == mBandwidthItems.size()) { index = kMinIndex; } } - mPrevBandwidthIndex = index; + mCurBandwidthIndex = index; #elif 0 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec @@ -933,14 +1018,29 @@ bool LiveSession::hasDynamicDuration() const { return false; } -status_t LiveSession::getTrackInfo(Parcel *reply) const { - return mPlaylist->getTrackInfo(reply); +size_t LiveSession::getTrackCount() const { + if (mPlaylist == NULL) { + return 0; + } else { + return mPlaylist->getTrackCount(); + } +} + +sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { + if (mPlaylist == NULL) { + return NULL; + } else { + return mPlaylist->getTrackInfo(trackIndex); + } } status_t LiveSession::selectTrack(size_t index, bool select) { status_t err = mPlaylist->selectTrack(index, select); if (err == OK) { - (new AMessage(kWhatChangeConfiguration, id()))->post(); + sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); + msg->setInt32("bandwidthIndex", mCurBandwidthIndex); + msg->setInt32("pickTrack", select); + msg->post(); } return err; } @@ -967,15 +1067,11 @@ void LiveSession::changeConfiguration( CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; - mPrevBandwidthIndex = bandwidthIndex; + mCurBandwidthIndex = bandwidthIndex; ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", timeUs, bandwidthIndex, pickTrack); - if (pickTrack) { - mPlaylist->pickRandomMediaItems(); - } - CHECK_LT(bandwidthIndex, mBandwidthItems.size()); const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); @@ -998,14 +1094,15 @@ void LiveSession::changeConfiguration( // If we're seeking all current fetchers are discarded. if (timeUs < 0ll) { - // delay fetcher removal - discardFetcher = false; + // delay fetcher removal if not picking tracks + discardFetcher = pickTrack; for (size_t j = 0; j < kMaxStreams; ++j) { StreamType type = indexToType(j); if ((streamMask & type) && uri == URIs[j]) { resumeMask |= type; streamMask &= ~type; + discardFetcher = false; } } } @@ -1019,16 +1116,17 @@ void LiveSession::changeConfiguration( sp<AMessage> msg; if (timeUs < 0ll) { - // skip onChangeConfiguration2 (decoder destruction) if switching. + // skip onChangeConfiguration2 (decoder destruction) if not seeking. msg = new AMessage(kWhatChangeConfiguration3, id()); } else { msg = new AMessage(kWhatChangeConfiguration2, id()); } msg->setInt32("streamMask", streamMask); msg->setInt32("resumeMask", resumeMask); + msg->setInt32("pickTrack", pickTrack); msg->setInt64("timeUs", timeUs); for (size_t i = 0; i < kMaxStreams; ++i) { - if (streamMask & indexToType(i)) { + if ((streamMask | resumeMask) & indexToType(i)) { msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); } } @@ -1052,7 +1150,10 @@ void LiveSession::changeConfiguration( void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { if (!mReconfigurationInProgress) { - changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); + int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; + msg->findInt32("pickTrack", &pickTrack); + msg->findInt32("bandwidthIndex", &bandwidthIndex); + changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); } else { msg->post(1000000ll); // retry in 1 sec } @@ -1063,8 +1164,14 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { // All fetchers are either suspended or have been removed now. - uint32_t streamMask; + uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); + CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); + + // currently onChangeConfiguration2 is only called for seeking; + // remove the following CHECK if using it else where. + CHECK_EQ(resumeMask, 0); + streamMask |= resumeMask; AString URIs[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { @@ -1128,16 +1235,21 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { } int64_t timeUs; + int32_t pickTrack; bool switching = false; CHECK(msg->findInt64("timeUs", &timeUs)); + CHECK(msg->findInt32("pickTrack", &pickTrack)); if (timeUs < 0ll) { - timeUs = mLastDequeuedTimeUs; - switching = true; + if (!pickTrack) { + switching = true; + } + mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; + } else { + mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; } - mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; - mNewStreamMask = streamMask; + mNewStreamMask = streamMask | resumeMask; // Of all existing fetchers: // * Resume fetchers that are still needed and assign them original packet sources. @@ -1150,6 +1262,16 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { for (size_t j = 0; j < kMaxStreams; ++j) { if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); + + if (j != kSubtitleIndex) { + ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); + sp<AnotherPacketSource> discontinuityQueue; + discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); + discontinuityQueue->queueDiscontinuity( + ATSParser::DISCONTINUITY_NONE, + NULL, + true); + } } } @@ -1183,7 +1305,9 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { CHECK(fetcher != NULL); int32_t latestSeq = -1; - int64_t latestTimeUs = 0ll; + int64_t startTimeUs = -1; + int64_t segmentStartTimeUs = -1ll; + int32_t discontinuitySeq = -1; sp<AnotherPacketSource> sources[kMaxStreams]; // TRICKY: looping from i as earlier streams are already removed from streamMask @@ -1191,29 +1315,65 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); - if (!switching) { + if (timeUs >= 0) { sources[j]->clear(); + startTimeUs = timeUs; + + sp<AnotherPacketSource> discontinuityQueue; + sp<AMessage> extra = new AMessage; + extra->setInt64("timeUs", timeUs); + discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); + discontinuityQueue->queueDiscontinuity( + ATSParser::DISCONTINUITY_SEEK, extra, true); } else { - int32_t type, seq; - int64_t srcTimeUs; - sp<AMessage> meta = sources[j]->getLatestMeta(); + int32_t type; + int64_t srcSegmentStartTimeUs; + sp<AMessage> meta; + if (pickTrack) { + // selecting + meta = sources[j]->getLatestDequeuedMeta(); + } else { + // adapting + meta = sources[j]->getLatestEnqueuedMeta(); + } if (meta != NULL && !meta->findInt32("discontinuity", &type)) { - CHECK(meta->findInt32("seq", &seq)); - if (seq > latestSeq) { - latestSeq = seq; + int64_t tmpUs; + CHECK(meta->findInt64("timeUs", &tmpUs)); + if (startTimeUs < 0 || tmpUs < startTimeUs) { + startTimeUs = tmpUs; } - CHECK(meta->findInt64("timeUs", &srcTimeUs)); - if (srcTimeUs > latestTimeUs) { - latestTimeUs = srcTimeUs; + + CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); + if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { + segmentStartTimeUs = tmpUs; + } + + int32_t seq; + CHECK(meta->findInt32("discontinuitySeq", &seq)); + if (discontinuitySeq < 0 || seq < discontinuitySeq) { + discontinuitySeq = seq; } } - sources[j] = mPacketSources2.valueFor(indexToType(j)); - sources[j]->clear(); - uint32_t extraStreams = mNewStreamMask & (~mStreamMask); - if (extraStreams & indexToType(j)) { - sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); + if (pickTrack) { + // selecting track, queue discontinuities before content + sources[j]->clear(); + if (j == kSubtitleIndex) { + break; + } + sp<AnotherPacketSource> discontinuityQueue; + discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); + discontinuityQueue->queueDiscontinuity( + ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); + } else { + // adapting, queue discontinuities after resume + sources[j] = mPacketSources2.valueFor(indexToType(j)); + sources[j]->clear(); + uint32_t extraStreams = mNewStreamMask & (~mStreamMask); + if (extraStreams & indexToType(j)) { + sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); + } } } @@ -1225,9 +1385,10 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], - timeUs, - latestTimeUs /* min start time(us) */, - latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); + startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, + segmentStartTimeUs, + discontinuitySeq, + switching); } // All fetchers have now been started, the configuration change @@ -1239,6 +1400,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { mReconfigurationInProgress = false; if (switching) { mSwitchInProgress = true; + mSwapMask = streamMask; } else { mStreamMask = mNewStreamMask; } @@ -1257,8 +1419,8 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) { int32_t stream; CHECK(msg->findInt32("stream", &stream)); - mSwapMask |= stream; - if (mSwapMask != mStreamMask) { + mSwapMask &= ~stream; + if (mSwapMask != 0) { return; } @@ -1274,9 +1436,12 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) { } // Mark switch done when: -// 1. all old buffers are swapped out, AND -// 2. all old fetchers are removed. +// 1. all old buffers are swapped out void LiveSession::tryToFinishBandwidthSwitch() { + if (!mSwitchInProgress) { + return; + } + bool needToRemoveFetchers = false; for (size_t i = 0; i < mFetcherInfos.size(); ++i) { if (mFetcherInfos.valueAt(i).mToBeRemoved) { @@ -1284,10 +1449,11 @@ void LiveSession::tryToFinishBandwidthSwitch() { break; } } - if (!needToRemoveFetchers && mSwapMask == mStreamMask) { + + if (!needToRemoveFetchers && mSwapMask == 0) { + ALOGI("mSwitchInProgress = false"); mStreamMask = mNewStreamMask; mSwitchInProgress = false; - mSwapMask = 0; } } @@ -1313,13 +1479,13 @@ bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { return false; } - if (mPrevBandwidthIndex < 0) { + if (mCurBandwidthIndex < 0) { return true; } - if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { + if (bandwidthIndex == (size_t)mCurBandwidthIndex) { return false; - } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { + } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { return canSwitchUp(); } else { return true; diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h index 3f8fee5..5423f0f 100644 --- a/media/libstagefright/httplive/LiveSession.h +++ b/media/libstagefright/httplive/LiveSession.h @@ -28,6 +28,7 @@ struct ABuffer; struct AnotherPacketSource; struct DataSource; struct HTTPBase; +struct IMediaHTTPService; struct LiveDataSource; struct M3UParser; struct PlaylistFetcher; @@ -40,7 +41,8 @@ struct LiveSession : public AHandler { }; LiveSession( const sp<AMessage> ¬ify, - uint32_t flags = 0, bool uidValid = false, uid_t uid = 0); + uint32_t flags, + const sp<IMediaHTTPService> &httpService); enum StreamIndex { kAudioIndex = 0, @@ -68,7 +70,8 @@ struct LiveSession : public AHandler { status_t seekTo(int64_t timeUs); status_t getDuration(int64_t *durationUs) const; - status_t getTrackInfo(Parcel *reply) const; + size_t getTrackCount() const; + sp<AMessage> getTrackInfo(size_t trackIndex) const; status_t selectTrack(size_t index, bool select); bool isSeekable() const; @@ -122,8 +125,19 @@ private: struct StreamItem { const char *mType; AString mUri; - StreamItem() : mType("") {} - StreamItem(const char *type) : mType(type) {} + size_t mCurDiscontinuitySeq; + int64_t mLastDequeuedTimeUs; + int64_t mLastSampleDurationUs; + StreamItem() + : mType(""), + mCurDiscontinuitySeq(0), + mLastDequeuedTimeUs(0), + mLastSampleDurationUs(0) {} + StreamItem(const char *type) + : mType(type), + mCurDiscontinuitySeq(0), + mLastDequeuedTimeUs(0), + mLastSampleDurationUs(0) {} AString uriKey() { AString key(mType); key.append("URI"); @@ -134,8 +148,7 @@ private: sp<AMessage> mNotify; uint32_t mFlags; - bool mUIDValid; - uid_t mUID; + sp<IMediaHTTPService> mHTTPService; bool mInPreparationPhase; @@ -145,7 +158,7 @@ private: AString mMasterURL; Vector<BandwidthItem> mBandwidthItems; - ssize_t mPrevBandwidthIndex; + ssize_t mCurBandwidthIndex; sp<M3UParser> mPlaylist; @@ -161,6 +174,7 @@ private: // we use this to track reconfiguration progress. uint32_t mSwapMask; + KeyedVector<StreamType, sp<AnotherPacketSource> > mDiscontinuities; KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources; // A second set of packet sources that buffer content for the variant we're switching to. KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2; @@ -185,6 +199,12 @@ private: uint32_t mDisconnectReplyID; uint32_t mSeekReplyID; + bool mFirstTimeUsValid; + int64_t mFirstTimeUs; + int64_t mLastSeekTimeUs; + KeyedVector<size_t, int64_t> mDiscontinuityAbsStartTimesUs; + KeyedVector<size_t, int64_t> mDiscontinuityOffsetTimesUs; + sp<PlaylistFetcher> addFetcher(const char *uri); void onConnect(const sp<AMessage> &msg); diff --git a/media/libstagefright/httplive/M3UParser.cpp b/media/libstagefright/httplive/M3UParser.cpp index 0700de0..1651dee 100644 --- a/media/libstagefright/httplive/M3UParser.cpp +++ b/media/libstagefright/httplive/M3UParser.cpp @@ -23,6 +23,7 @@ #include <cutils/properties.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/MediaDefs.h> #include <media/stagefright/MediaErrors.h> #include <media/stagefright/Utils.h> #include <media/mediaplayer.h> @@ -58,8 +59,8 @@ struct M3UParser::MediaGroup : public RefBase { void pickRandomMediaItems(); status_t selectTrack(size_t index, bool select); - void getTrackInfo(Parcel* reply) const; size_t countTracks() const; + sp<AMessage> getTrackInfo(size_t index) const; protected: virtual ~MediaGroup(); @@ -156,8 +157,8 @@ void M3UParser::MediaGroup::pickRandomMediaItems() { } status_t M3UParser::MediaGroup::selectTrack(size_t index, bool select) { - if (mType != TYPE_SUBS) { - ALOGE("only select subtitile tracks for now!"); + if (mType != TYPE_SUBS && mType != TYPE_AUDIO) { + ALOGE("only select subtitile/audio tracks for now!"); return INVALID_OPERATION; } @@ -170,49 +171,56 @@ status_t M3UParser::MediaGroup::selectTrack(size_t index, bool select) { ALOGE("track %zu already selected", index); return BAD_VALUE; } - ALOGV("selected track %d", index); + ALOGV("selected track %zu", index); mSelectedIndex = index; } else { if (mSelectedIndex != (ssize_t)index) { ALOGE("track %zu is not selected", index); return BAD_VALUE; } - ALOGV("unselected track %d", index); + ALOGV("unselected track %zu", index); mSelectedIndex = -1; } return OK; } -void M3UParser::MediaGroup::getTrackInfo(Parcel* reply) const { - for (size_t i = 0; i < mMediaItems.size(); ++i) { - reply->writeInt32(2); // 2 fields - - if (mType == TYPE_AUDIO) { - reply->writeInt32(MEDIA_TRACK_TYPE_AUDIO); - } else if (mType == TYPE_VIDEO) { - reply->writeInt32(MEDIA_TRACK_TYPE_VIDEO); - } else if (mType == TYPE_SUBS) { - reply->writeInt32(MEDIA_TRACK_TYPE_SUBTITLE); - } else { - reply->writeInt32(MEDIA_TRACK_TYPE_UNKNOWN); - } +size_t M3UParser::MediaGroup::countTracks() const { + return mMediaItems.size(); +} - const Media &item = mMediaItems.itemAt(i); - const char *lang = item.mLanguage.empty() ? "und" : item.mLanguage.c_str(); - reply->writeString16(String16(lang)); +sp<AMessage> M3UParser::MediaGroup::getTrackInfo(size_t index) const { + if (index >= mMediaItems.size()) { + return NULL; + } - if (mType == TYPE_SUBS) { - // TO-DO: pass in a MediaFormat instead - reply->writeInt32(!!(item.mFlags & MediaGroup::FLAG_AUTOSELECT)); - reply->writeInt32(!!(item.mFlags & MediaGroup::FLAG_DEFAULT)); - reply->writeInt32(!!(item.mFlags & MediaGroup::FLAG_FORCED)); - } + sp<AMessage> format = new AMessage(); + + int32_t trackType; + if (mType == TYPE_AUDIO) { + trackType = MEDIA_TRACK_TYPE_AUDIO; + } else if (mType == TYPE_VIDEO) { + trackType = MEDIA_TRACK_TYPE_VIDEO; + } else if (mType == TYPE_SUBS) { + trackType = MEDIA_TRACK_TYPE_SUBTITLE; + } else { + trackType = MEDIA_TRACK_TYPE_UNKNOWN; + } + format->setInt32("type", trackType); + + const Media &item = mMediaItems.itemAt(index); + const char *lang = item.mLanguage.empty() ? "und" : item.mLanguage.c_str(); + format->setString("language", lang); + + if (mType == TYPE_SUBS) { + // TO-DO: pass in a MediaFormat instead + format->setString("mime", MEDIA_MIMETYPE_TEXT_VTT); + format->setInt32("auto", !!(item.mFlags & MediaGroup::FLAG_AUTOSELECT)); + format->setInt32("default", !!(item.mFlags & MediaGroup::FLAG_DEFAULT)); + format->setInt32("forced", !!(item.mFlags & MediaGroup::FLAG_FORCED)); } -} -size_t M3UParser::MediaGroup::countTracks() const { - return mMediaItems.size(); + return format; } bool M3UParser::MediaGroup::getActiveURI(AString *uri) const { @@ -238,6 +246,7 @@ M3UParser::M3UParser( mIsVariantPlaylist(false), mIsComplete(false), mIsEvent(false), + mDiscontinuitySeq(0), mSelectedIndex(-1) { mInitCheck = parse(data, size); } @@ -265,6 +274,10 @@ bool M3UParser::isEvent() const { return mIsEvent; } +size_t M3UParser::getDiscontinuitySeq() const { + return mDiscontinuitySeq; +} + sp<AMessage> M3UParser::meta() { return mMeta; } @@ -319,17 +332,24 @@ status_t M3UParser::selectTrack(size_t index, bool select) { return INVALID_OPERATION; } -status_t M3UParser::getTrackInfo(Parcel* reply) const { +size_t M3UParser::getTrackCount() const { size_t trackCount = 0; for (size_t i = 0; i < mMediaGroups.size(); ++i) { trackCount += mMediaGroups.valueAt(i)->countTracks(); } - reply->writeInt32(trackCount); + return trackCount; +} - for (size_t i = 0; i < mMediaGroups.size(); ++i) { - mMediaGroups.valueAt(i)->getTrackInfo(reply); +sp<AMessage> M3UParser::getTrackInfo(size_t index) const { + for (size_t i = 0, ii = index; i < mMediaGroups.size(); ++i) { + sp<MediaGroup> group = mMediaGroups.valueAt(i); + size_t tracks = group->countTracks(); + if (ii < tracks) { + return group->getTrackInfo(ii); + } + ii -= tracks; } - return OK; + return NULL; } ssize_t M3UParser::getSelectedIndex() const { @@ -552,6 +572,12 @@ status_t M3UParser::parse(const void *_data, size_t size) { } } else if (line.startsWith("#EXT-X-MEDIA")) { err = parseMedia(line); + } else if (line.startsWith("#EXT-X-DISCONTINUITY-SEQUENCE")) { + size_t seq; + err = parseDiscontinuitySequence(line, &seq); + if (err == OK) { + mDiscontinuitySeq = seq; + } } if (err != OK) { @@ -806,7 +832,8 @@ status_t M3UParser::parseCipherInfo( if (MakeURL(baseURI.c_str(), val.c_str(), &absURI)) { val = absURI; } else { - ALOGE("failed to make absolute url for <URL suppressed>."); + ALOGE("failed to make absolute url for %s.", + uriDebugString(baseURI).c_str()); } } @@ -1094,6 +1121,30 @@ status_t M3UParser::parseMedia(const AString &line) { } // static +status_t M3UParser::parseDiscontinuitySequence(const AString &line, size_t *seq) { + ssize_t colonPos = line.find(":"); + + if (colonPos < 0) { + return ERROR_MALFORMED; + } + + int32_t x; + status_t err = ParseInt32(line.c_str() + colonPos + 1, &x); + if (err != OK) { + return err; + } + + if (x < 0) { + return ERROR_MALFORMED; + } + + if (seq) { + *seq = x; + } + return OK; +} + +// static status_t M3UParser::ParseInt32(const char *s, int32_t *x) { char *end; long lval = strtol(s, &end, 10); diff --git a/media/libstagefright/httplive/M3UParser.h b/media/libstagefright/httplive/M3UParser.h index ccd6556..d588afe 100644 --- a/media/libstagefright/httplive/M3UParser.h +++ b/media/libstagefright/httplive/M3UParser.h @@ -34,6 +34,7 @@ struct M3UParser : public RefBase { bool isVariantPlaylist() const; bool isComplete() const; bool isEvent() const; + size_t getDiscontinuitySeq() const; sp<AMessage> meta(); @@ -42,7 +43,8 @@ struct M3UParser : public RefBase { void pickRandomMediaItems(); status_t selectTrack(size_t index, bool select); - status_t getTrackInfo(Parcel* reply) const; + size_t getTrackCount() const; + sp<AMessage> getTrackInfo(size_t index) const; ssize_t getSelectedIndex() const; bool getTypeURI(size_t index, const char *key, AString *uri) const; @@ -65,6 +67,7 @@ private: bool mIsVariantPlaylist; bool mIsComplete; bool mIsEvent; + size_t mDiscontinuitySeq; sp<AMessage> mMeta; Vector<Item> mItems; @@ -93,6 +96,8 @@ private: status_t parseMedia(const AString &line); + static status_t parseDiscontinuitySequence(const AString &line, size_t *seq); + static status_t ParseInt32(const char *s, int32_t *x); static status_t ParseDouble(const char *s, double *x); diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp index 513f114..82a4c39 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.cpp +++ b/media/libstagefright/httplive/PlaylistFetcher.cpp @@ -49,7 +49,7 @@ namespace android { // static const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; -const int32_t PlaylistFetcher::kDownloadBlockSize = 192; +const int32_t PlaylistFetcher::kDownloadBlockSize = 2048; const int32_t PlaylistFetcher::kNumSkipFrames = 10; PlaylistFetcher::PlaylistFetcher( @@ -62,18 +62,21 @@ PlaylistFetcher::PlaylistFetcher( mURI(uri), mStreamTypeMask(0), mStartTimeUs(-1ll), - mMinStartTimeUs(0ll), - mStopParams(NULL), + mSegmentStartTimeUs(-1ll), + mDiscontinuitySeq(-1ll), + mStartTimeUsRelative(false), mLastPlaylistFetchTimeUs(-1ll), mSeqNumber(-1), mNumRetries(0), mStartup(true), + mAdaptive(false), mPrepared(false), mNextPTSTimeUs(-1ll), mMonitorQueueGeneration(0), mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), mFirstPTSValid(false), - mAbsoluteTimeAnchorUs(0ll) { + mAbsoluteTimeAnchorUs(0ll), + mVideoBuffer(new AnotherPacketSource(NULL)) { memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); mStartTimeUsNotify->setInt32("what", kWhatStartedAt); mStartTimeUsNotify->setInt32("streamMask", 0); @@ -317,7 +320,7 @@ void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { maxDelayUs = minDelayUs; } if (delayUs > maxDelayUs) { - ALOGV("Need to refresh playlist in %lld", maxDelayUs); + ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs); delayUs = maxDelayUs; } sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); @@ -334,8 +337,9 @@ void PlaylistFetcher::startAsync( const sp<AnotherPacketSource> &videoSource, const sp<AnotherPacketSource> &subtitleSource, int64_t startTimeUs, - int64_t minStartTimeUs, - int32_t startSeqNumberHint) { + int64_t segmentStartTimeUs, + int32_t startDiscontinuitySeq, + bool adaptive) { sp<AMessage> msg = new AMessage(kWhatStart, id()); uint32_t streamTypeMask = 0ul; @@ -357,8 +361,9 @@ void PlaylistFetcher::startAsync( msg->setInt32("streamTypeMask", streamTypeMask); msg->setInt64("startTimeUs", startTimeUs); - msg->setInt64("minStartTimeUs", minStartTimeUs); - msg->setInt32("startSeqNumberHint", startSeqNumberHint); + msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); + msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); + msg->setInt32("adaptive", adaptive); msg->post(); } @@ -366,9 +371,9 @@ void PlaylistFetcher::pauseAsync() { (new AMessage(kWhatPause, id()))->post(); } -void PlaylistFetcher::stopAsync(bool selfTriggered) { +void PlaylistFetcher::stopAsync(bool clear) { sp<AMessage> msg = new AMessage(kWhatStop, id()); - msg->setInt32("selfTriggered", selfTriggered); + msg->setInt32("clear", clear); msg->post(); } @@ -448,10 +453,13 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); int64_t startTimeUs; - int32_t startSeqNumberHint; + int64_t segmentStartTimeUs; + int32_t startDiscontinuitySeq; + int32_t adaptive; CHECK(msg->findInt64("startTimeUs", &startTimeUs)); - CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs)); - CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint)); + CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); + CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); + CHECK(msg->findInt32("adaptive", &adaptive)); if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { void *ptr; @@ -481,16 +489,16 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { } mStreamTypeMask = streamTypeMask; - mStartTimeUs = startTimeUs; - if (mStartTimeUs >= 0ll) { + mSegmentStartTimeUs = segmentStartTimeUs; + mDiscontinuitySeq = startDiscontinuitySeq; + + if (startTimeUs >= 0) { + mStartTimeUs = startTimeUs; mSeqNumber = -1; mStartup = true; mPrepared = false; - } - - if (startSeqNumberHint >= 0) { - mSeqNumber = startSeqNumberHint; + mAdaptive = adaptive; } postMonitorQueue(); @@ -505,11 +513,9 @@ void PlaylistFetcher::onPause() { void PlaylistFetcher::onStop(const sp<AMessage> &msg) { cancelMonitorQueue(); - int32_t selfTriggered; - CHECK(msg->findInt32("selfTriggered", &selfTriggered)); - if (!selfTriggered) { - // Self triggered stops only happen during switching, in which case we do not want - // to clear the discontinuities queued at the end of packet sources. + int32_t clear; + CHECK(msg->findInt32("clear", &clear)); + if (clear) { for (size_t i = 0; i < mPacketSources.size(); i++) { sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); packetSource->clear(); @@ -551,15 +557,16 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { } // Don't resume if we would stop within a resume threshold. + int32_t discontinuitySeq; int64_t latestTimeUs = 0, stopTimeUs = 0; - sp<AMessage> latestMeta = packetSource->getLatestMeta(); + sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta(); if (latestMeta != NULL - && (latestMeta->findInt64("timeUs", &latestTimeUs) - && params->findInt64(stopKey, &stopTimeUs))) { - int64_t diffUs = stopTimeUs - latestTimeUs; - if (diffUs < resumeThreshold(latestMeta)) { - stop = true; - } + && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq) + && discontinuitySeq == mDiscontinuitySeq + && latestMeta->findInt64("timeUs", &latestTimeUs) + && params->findInt64(stopKey, &stopTimeUs) + && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) { + stop = true; } } @@ -567,7 +574,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { for (size_t i = 0; i < mPacketSources.size(); i++) { mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); } - stopAsync(/* selfTriggered = */ true); + stopAsync(/* clear = */ false); return OK; } @@ -587,7 +594,10 @@ void PlaylistFetcher::notifyError(status_t err) { void PlaylistFetcher::queueDiscontinuity( ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { for (size_t i = 0; i < mPacketSources.size(); ++i) { - mPacketSources.valueAt(i)->queueDiscontinuity(type, extra); + // do not discard buffer upon #EXT-X-DISCONTINUITY tag + // (seek will discard buffer by abandoning old fetchers) + mPacketSources.valueAt(i)->queueDiscontinuity( + type, extra, false /* discard */); } } @@ -628,7 +638,7 @@ void PlaylistFetcher::onMonitorQueue() { int64_t bufferedStreamDurationUs = mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); - ALOGV("buffered %lld for stream %d", + ALOGV("buffered %" PRId64 " for stream %d", bufferedStreamDurationUs, mPacketSources.keyAt(i)); if (bufferedStreamDurationUs > bufferedDurationUs) { bufferedDurationUs = bufferedStreamDurationUs; @@ -641,7 +651,7 @@ void PlaylistFetcher::onMonitorQueue() { if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { mPrepared = true; - ALOGV("prepared, buffered=%lld > %lld", + ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "", bufferedDurationUs, targetDurationUs); sp<AMessage> msg = mNotify->dup(); msg->setInt32("what", kWhatTemporarilyDoneFetching); @@ -649,7 +659,7 @@ void PlaylistFetcher::onMonitorQueue() { } if (finalResult == OK && downloadMore) { - ALOGV("monitoring, buffered=%lld < %lld", + ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "", bufferedDurationUs, durationToBufferUs); // delay the next download slightly; hopefully this gives other concurrent fetchers // a better chance to run. @@ -665,7 +675,7 @@ void PlaylistFetcher::onMonitorQueue() { msg->post(); int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; - ALOGV("pausing for %lld, buffered=%lld > %lld", + ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "", delayUs, bufferedDurationUs, durationToBufferUs); // :TRICKY: need to enforce minimum delay because the delay to // refresh the playlist will become 0 @@ -722,8 +732,7 @@ void PlaylistFetcher::onDownloadNext() { firstSeqNumberInPlaylist = 0; } - bool seekDiscontinuity = false; - bool explicitDiscontinuity = false; + bool discontinuity = false; const int32_t lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; @@ -734,26 +743,50 @@ void PlaylistFetcher::onDownloadNext() { mSeqNumber = lastSeqNumberInPlaylist; } + if (mDiscontinuitySeq < 0) { + mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); + } + if (mSeqNumber < 0) { CHECK_GE(mStartTimeUs, 0ll); - if (mPlaylist->isComplete() || mPlaylist->isEvent()) { - mSeqNumber = getSeqNumberForTime(mStartTimeUs); - ALOGV("Initial sequence number for time %lld is %ld from (%ld .. %ld)", + if (mSegmentStartTimeUs < 0) { + if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { + // If this is a live session, start 3 segments from the end on connect + mSeqNumber = lastSeqNumberInPlaylist - 3; + if (mSeqNumber < firstSeqNumberInPlaylist) { + mSeqNumber = firstSeqNumberInPlaylist; + } + } else { + mSeqNumber = getSeqNumberForTime(mStartTimeUs); + mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); + } + mStartTimeUsRelative = true; + ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)", mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); } else { - // If this is a live session, start 3 segments from the end. - mSeqNumber = lastSeqNumberInPlaylist - 3; + mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); + if (mAdaptive) { + // avoid double fetch/decode + mSeqNumber += 1; + } + ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); + if (mSeqNumber < minSeq) { + mSeqNumber = minSeq; + } + if (mSeqNumber < firstSeqNumberInPlaylist) { mSeqNumber = firstSeqNumberInPlaylist; } - ALOGV("Initial sequence number for live event %ld from (%ld .. %ld)", + + if (mSeqNumber > lastSeqNumberInPlaylist) { + mSeqNumber = lastSeqNumberInPlaylist; + } + ALOGV("Initial sequence number for live event %d from (%d .. %d)", mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); } - - mStartTimeUs = -1ll; } if (mSeqNumber < firstSeqNumberInPlaylist @@ -772,7 +805,8 @@ void PlaylistFetcher::onDownloadNext() { if (delayUs > kMaxMonitorDelayUs) { delayUs = kMaxMonitorDelayUs; } - ALOGV("sequence number high: %ld from (%ld .. %ld), monitor in %lld (retry=%d)", + ALOGV("sequence number high: %d from (%d .. %d), " + "monitor in %" PRId64 " (retry=%d)", mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist, delayUs, mNumRetries); postMonitorQueue(delayUs); @@ -790,7 +824,7 @@ void PlaylistFetcher::onDownloadNext() { if (mSeqNumber < firstSeqNumberInPlaylist) { mSeqNumber = firstSeqNumberInPlaylist; } - explicitDiscontinuity = true; + discontinuity = true; // fall through } else { @@ -815,7 +849,8 @@ void PlaylistFetcher::onDownloadNext() { int32_t val; if (itemMeta->findInt32("discontinuity", &val) && val != 0) { - explicitDiscontinuity = true; + mDiscontinuitySeq++; + discontinuity = true; } int64_t range_offset, range_length; @@ -846,6 +881,7 @@ void PlaylistFetcher::onDownloadNext() { } // block-wise download + bool startup = mStartup; ssize_t bytesRead; do { bytesRead = mSession->fetchFile( @@ -875,7 +911,7 @@ void PlaylistFetcher::onDownloadNext() { return; } - if (mStartup || seekDiscontinuity || explicitDiscontinuity) { + if (startup || discontinuity) { // Signal discontinuity. if (mPlaylist->isComplete() || mPlaylist->isEvent()) { @@ -885,16 +921,17 @@ void PlaylistFetcher::onDownloadNext() { mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); } - if (seekDiscontinuity || explicitDiscontinuity) { - ALOGI("queueing discontinuity (seek=%d, explicit=%d)", - seekDiscontinuity, explicitDiscontinuity); + if (discontinuity) { + ALOGI("queueing discontinuity (explicit=%d)", discontinuity); queueDiscontinuity( - explicitDiscontinuity - ? ATSParser::DISCONTINUITY_FORMATCHANGE - : ATSParser::DISCONTINUITY_SEEK, + ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */); + + discontinuity = false; } + + startup = false; } err = OK; @@ -914,23 +951,19 @@ void PlaylistFetcher::onDownloadNext() { } if (err == -EAGAIN) { - // bad starting sequence number hint + // starting sequence number too low + mTSParser.clear(); postMonitorQueue(); return; - } - - if (err == ERROR_OUT_OF_RANGE) { + } else if (err == ERROR_OUT_OF_RANGE) { // reached stopping point - stopAsync(/* selfTriggered = */ true); + stopAsync(/* clear = */ false); return; - } - - if (err != OK) { + } else if (err != OK) { notifyError(err); return; } - mStartup = false; } while (bytesRead != 0); if (bufferStartsWithTsSyncByte(buffer)) { @@ -990,11 +1023,44 @@ void PlaylistFetcher::onDownloadNext() { return; } + mStartup = false; ++mSeqNumber; postMonitorQueue(); } +int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { + int32_t firstSeqNumberInPlaylist; + if (mPlaylist->meta() == NULL + || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { + firstSeqNumberInPlaylist = 0; + } + + size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); + if (discontinuitySeq < curDiscontinuitySeq) { + return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1); + } + + size_t index = 0; + while (index < mPlaylist->size()) { + sp<AMessage> itemMeta; + CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); + + int64_t discontinuity; + if (itemMeta->findInt64("discontinuity", &discontinuity)) { + curDiscontinuitySeq++; + } + + if (curDiscontinuitySeq == discontinuitySeq) { + return firstSeqNumberInPlaylist + index; + } + + ++index; + } + + return firstSeqNumberInPlaylist + mPlaylist->size(); +} + int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { int32_t firstSeqNumberInPlaylist; if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( @@ -1027,6 +1093,23 @@ int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { return firstSeqNumberInPlaylist + index; } +const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( + const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { + sp<MetaData> format = source->getFormat(); + if (format != NULL) { + // for simplicity, store a reference to the format in each unit + accessUnit->meta()->setObject("format", format); + } + + if (discard) { + accessUnit->meta()->setInt32("discard", discard); + } + + accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); + accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); + return accessUnit; +} + status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { if (mTSParser == NULL) { // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. @@ -1042,7 +1125,9 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu mTSParser->signalDiscontinuity( ATSParser::DISCONTINUITY_SEEK, extra); + mAbsoluteTimeAnchorUs = mNextPTSTimeUs; mNextPTSTimeUs = -1ll; + mFirstPTSValid = false; } size_t offset = 0; @@ -1102,21 +1187,23 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu && source->dequeueAccessUnit(&accessUnit) == OK) { CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); - if (mMinStartTimeUs > 0) { - if (timeUs < mMinStartTimeUs) { - // TODO untested path - // try a later ts - int32_t targetDuration; - mPlaylist->meta()->findInt32("target-duration", &targetDuration); - int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration; - if (incr == 0) { - // increment mSeqNumber by at least one - incr = 1; + + if (mStartup) { + if (!mFirstPTSValid) { + mFirstTimeUs = timeUs; + mFirstPTSValid = true; + } + if (mStartTimeUsRelative) { + timeUs -= mFirstTimeUs; + if (timeUs < 0) { + timeUs = 0; + } + } else if (mAdaptive && timeUs > mStartTimeUs) { + int32_t seq; + if (mStartTimeUsNotify != NULL + && !mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) { + mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); } - mSeqNumber += incr; - err = -EAGAIN; - break; - } else { int64_t startTimeUs; if (mStartTimeUsNotify != NULL && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) { @@ -1133,12 +1220,51 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu } } } + + if (timeUs < mStartTimeUs) { + if (mAdaptive) { + int32_t targetDuration; + mPlaylist->meta()->findInt32("target-duration", &targetDuration); + int32_t incr = (mStartTimeUs - timeUs) / 1000000 / targetDuration; + if (incr == 0) { + // increment mSeqNumber by at least one + incr = 1; + } + mSeqNumber += incr; + err = -EAGAIN; + break; + } else { + // buffer up to the closest preceding IDR frame + ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us", + timeUs, mStartTimeUs); + const char *mime; + sp<MetaData> format = source->getFormat(); + bool isAvc = false; + if (format != NULL && format->findCString(kKeyMIMEType, &mime) + && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) { + isAvc = true; + } + if (isAvc && IsIDR(accessUnit)) { + mVideoBuffer->clear(); + } + if (isAvc) { + mVideoBuffer->queueAccessUnit(accessUnit); + } + + continue; + } + } } if (mStopParams != NULL) { // Queue discontinuity in original stream. + int32_t discontinuitySeq; int64_t stopTimeUs; - if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) { + if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) + || discontinuitySeq > mDiscontinuitySeq + || !mStopParams->findInt64(key, &stopTimeUs) + || (discontinuitySeq == mDiscontinuitySeq + && timeUs >= stopTimeUs)) { packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); mStreamTypeMask &= ~stream; mPacketSources.removeItemsAt(i); @@ -1147,15 +1273,18 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu } // Note that we do NOT dequeue any discontinuities except for format change. - - // for simplicity, store a reference to the format in each unit - sp<MetaData> format = source->getFormat(); - if (format != NULL) { - accessUnit->meta()->setObject("format", format); + if (stream == LiveSession::STREAMTYPE_VIDEO) { + const bool discard = true; + status_t status; + while (mVideoBuffer->hasBufferAvailable(&status)) { + sp<ABuffer> videoBuffer; + mVideoBuffer->dequeueAccessUnit(&videoBuffer); + setAccessUnitProperties(videoBuffer, source, discard); + packetSource->queueAccessUnit(videoBuffer); + } } - // Stash the sequence number so we can hint future playlist where to start at. - accessUnit->meta()->setInt32("seq", mSeqNumber); + setAccessUnitProperties(accessUnit, source); packetSource->queueAccessUnit(accessUnit); } @@ -1181,9 +1310,35 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu return OK; } +/* static */ +bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( + const sp<ABuffer> &buffer) { + size_t pos = 0; + + // skip possible BOM + if (buffer->size() >= pos + 3 && + !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { + pos += 3; + } + + // accept WEBVTT followed by SPACE, TAB or (CR) LF + if (buffer->size() < pos + 6 || + memcmp("WEBVTT", buffer->data() + pos, 6)) { + return false; + } + pos += 6; + + if (buffer->size() == pos) { + return true; + } + + uint8_t sep = buffer->data()[pos]; + return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; +} + status_t PlaylistFetcher::extractAndQueueAccessUnits( const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { - if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) { + if (bufferStartsWithWebVTTMagicSequence(buffer)) { if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { ALOGE("This stream only contains subtitles."); return ERROR_MALFORMED; @@ -1196,7 +1351,8 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( CHECK(itemMeta->findInt64("durationUs", &durationUs)); buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); buffer->meta()->setInt64("durationUs", durationUs); - buffer->meta()->setInt32("seq", mSeqNumber); + buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); + buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); packetSource->queueAccessUnit(buffer); return OK; @@ -1262,14 +1418,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( firstID3Tag = false; } - if (!mFirstPTSValid) { - mFirstPTSValid = true; - mFirstPTS = PTS; - } - PTS -= mFirstPTS; - - int64_t timeUs = (PTS * 100ll) / 9ll + mAbsoluteTimeAnchorUs; - if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { ALOGW("This stream only contains audio data!"); @@ -1312,6 +1460,12 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( int32_t sampleRate; CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); + int64_t timeUs = (PTS * 100ll) / 9ll; + if (!mFirstPTSValid) { + mFirstPTSValid = true; + mFirstTimeUs = timeUs; + } + size_t offset = 0; while (offset < buffer->size()) { const uint8_t *adtsHeader = buffer->data() + offset; @@ -1336,19 +1490,32 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( CHECK_LE(offset + aac_frame_length, buffer->size()); - sp<ABuffer> unit = new ABuffer(aac_frame_length); - memcpy(unit->data(), adtsHeader, aac_frame_length); - int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; - unit->meta()->setInt64("timeUs", unitTimeUs); + offset += aac_frame_length; // Each AAC frame encodes 1024 samples. numSamples += 1024; - unit->meta()->setInt32("seq", mSeqNumber); - packetSource->queueAccessUnit(unit); + if (mStartup) { + int64_t startTimeUs = unitTimeUs; + if (mStartTimeUsRelative) { + startTimeUs -= mFirstTimeUs; + if (startTimeUs < 0) { + startTimeUs = 0; + } + } + if (startTimeUs < mStartTimeUs) { + continue; + } + } - offset += aac_frame_length; + sp<ABuffer> unit = new ABuffer(aac_frame_length); + memcpy(unit->data(), adtsHeader, aac_frame_length); + + unit->meta()->setInt64("timeUs", unitTimeUs); + unit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); + unit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); + packetSource->queueAccessUnit(unit); } return OK; diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h index 7e21523..daefb26 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.h +++ b/media/libstagefright/httplive/PlaylistFetcher.h @@ -57,13 +57,15 @@ struct PlaylistFetcher : public AHandler { const sp<AnotherPacketSource> &audioSource, const sp<AnotherPacketSource> &videoSource, const sp<AnotherPacketSource> &subtitleSource, - int64_t startTimeUs = -1ll, - int64_t minStartTimeUs = 0ll /* start after this timestamp */, - int32_t startSeqNumberHint = -1 /* try starting at this sequence number */); + int64_t startTimeUs = -1ll, // starting timestamps + int64_t segmentStartTimeUs = -1ll, // starting position within playlist + // startTimeUs!=segmentStartTimeUs only when playlist is live + int32_t startDiscontinuitySeq = 0, + bool adaptive = false); void pauseAsync(); - void stopAsync(bool selfTriggered = false); + void stopAsync(bool clear = true); void resumeUntilAsync(const sp<AMessage> ¶ms); @@ -91,6 +93,7 @@ private: static const int32_t kNumSkipFrames; static bool bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer); + static bool bufferStartsWithWebVTTMagicSequence(const sp<ABuffer>& buffer); // notifications to mSession sp<AMessage> mNotify; @@ -101,7 +104,9 @@ private: uint32_t mStreamTypeMask; int64_t mStartTimeUs; - int64_t mMinStartTimeUs; // start fetching no earlier than this value + int64_t mSegmentStartTimeUs; + ssize_t mDiscontinuitySeq; + bool mStartTimeUsRelative; sp<AMessage> mStopParams; // message containing the latest timestamps we should fetch. KeyedVector<LiveSession::StreamType, sp<AnotherPacketSource> > @@ -114,6 +119,7 @@ private: int32_t mSeqNumber; int32_t mNumRetries; bool mStartup; + bool mAdaptive; bool mPrepared; int64_t mNextPTSTimeUs; @@ -133,7 +139,9 @@ private: bool mFirstPTSValid; uint64_t mFirstPTS; + int64_t mFirstTimeUs; int64_t mAbsoluteTimeAnchorUs; + sp<AnotherPacketSource> mVideoBuffer; // Stores the initialization vector to decrypt the next block of cipher text, which can // either be derived from the sequence number, read from the manifest, or copied from @@ -172,6 +180,10 @@ private: // Resume a fetcher to continue until the stopping point stored in msg. status_t onResumeUntil(const sp<AMessage> &msg); + const sp<ABuffer> &setAccessUnitProperties( + const sp<ABuffer> &accessUnit, + const sp<AnotherPacketSource> &source, + bool discard = false); status_t extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer); status_t extractAndQueueAccessUnits( @@ -182,6 +194,8 @@ private: void queueDiscontinuity( ATSParser::DiscontinuityType type, const sp<AMessage> &extra); + int32_t getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const; + int32_t getSeqNumberForDiscontinuity(size_t discontinuitySeq) const; int32_t getSeqNumberForTime(int64_t timeUs) const; void updateDuration(); |