From 1543d3c735a5ba4ddfcf8ab644575df13c7e30a9 Mon Sep 17 00:00:00 2001 From: Robert Shih Date: Thu, 20 Feb 2014 13:07:26 -0800 Subject: Initial HLS seamless switch implementation. Bug: 11854054 Change-Id: I75fc2a258111295039ac13cc37e407df25891dd2 --- media/libstagefright/httplive/LiveSession.cpp | 280 +++++++++++++++++++--- media/libstagefright/httplive/LiveSession.h | 36 +++ media/libstagefright/httplive/PlaylistFetcher.cpp | 251 +++++++++++++++++-- media/libstagefright/httplive/PlaylistFetcher.h | 22 +- 4 files changed, 541 insertions(+), 48 deletions(-) (limited to 'media/libstagefright') diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index 95779c4..2af4682 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -40,6 +40,8 @@ #include #include +#include + #include #include #include @@ -56,10 +58,14 @@ LiveSession::LiveSession( mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), mPrevBandwidthIndex(-1), mStreamMask(0), + mNewStreamMask(0), + mSwapMask(0), mCheckBandwidthGeneration(0), + mSwitchGeneration(0), mLastDequeuedTimeUs(0ll), mRealTimeBaseUs(0ll), mReconfigurationInProgress(false), + mSwitchInProgress(false), mDisconnectReplyID(0) { mStreams[kAudioIndex] = StreamItem("audio"); @@ -68,16 +74,37 @@ LiveSession::LiveSession( for (size_t i = 0; i < kMaxStreams; ++i) { mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); + mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); } } LiveSession::~LiveSession() { } +sp LiveSession::createFormatChangeBuffer(bool swap) { + ABuffer *discontinuity = new ABuffer(0); + discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); + discontinuity->meta()->setInt32("swapPacketSource", swap); + discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); + discontinuity->meta()->setInt64("timeUs", -1); + return discontinuity; +} + +void LiveSession::swapPacketSource(StreamType stream) { + sp &aps = mPacketSources.editValueFor(stream); + sp &aps2 = mPacketSources2.editValueFor(stream); + sp tmp = aps; + aps = aps2; + aps2 = tmp; + aps2->clear(); +} + status_t LiveSession::dequeueAccessUnit( StreamType stream, sp *accessUnit) { if (!(mStreamMask & stream)) { - return UNKNOWN_ERROR; + // return -EWOULDBLOCK to avoid halting the decoder + // when switching between audio/video and audio only. + return -EWOULDBLOCK; } sp packetSource = mPacketSources.valueFor(stream); @@ -117,6 +144,25 @@ status_t LiveSession::dequeueAccessUnit( streamStr, type, extra == NULL ? "NULL" : extra->debugString().c_str()); + + int32_t swap; + if (type == ATSParser::DISCONTINUITY_FORMATCHANGE + && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) + && swap) { + + int32_t switchGeneration; + CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); + { + Mutex::Autolock lock(mSwapMutex); + if (switchGeneration == mSwitchGeneration) { + swapPacketSource(stream); + sp msg = new AMessage(kWhatSwapped, id()); + msg->setInt32("stream", stream); + msg->setInt32("switchGeneration", switchGeneration); + msg->post(); + } + } + } } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { int64_t timeUs; @@ -138,6 +184,7 @@ status_t LiveSession::dequeueAccessUnit( } status_t LiveSession::getStreamFormat(StreamType stream, sp *format) { + // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; } @@ -234,7 +281,12 @@ void LiveSession::onMessageReceived(const sp &msg) { if (what == PlaylistFetcher::kWhatStopped) { AString uri; CHECK(msg->findString("uri", &uri)); - mFetcherInfos.removeItem(uri); + if (mFetcherInfos.removeItem(uri) < 0) { + // ignore duplicated kWhatStopped messages. + break; + } + + tryToFinishBandwidthSwitch(); } if (mContinuation != NULL) { @@ -270,6 +322,8 @@ void LiveSession::onMessageReceived(const sp &msg) { postPrepared(err); } + cancelBandwidthSwitch(); + mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); @@ -308,6 +362,27 @@ void LiveSession::onMessageReceived(const sp &msg) { break; } + case PlaylistFetcher::kWhatStartedAt: + { + int32_t switchGeneration; + CHECK(msg->findInt32("switchGeneration", &switchGeneration)); + + if (switchGeneration != mSwitchGeneration) { + break; + } + + // Resume fetcher for the original variant; the resumed fetcher should + // continue until the timestamps found in msg, which is stored by the + // new fetcher to indicate where the new variant has started buffering. + for (size_t i = 0; i < mFetcherInfos.size(); i++) { + const FetcherInfo info = mFetcherInfos.valueAt(i); + if (info.mToBeRemoved) { + info.mFetcher->resumeUntilAsync(msg); + } + } + break; + } + default: TRESPASS(); } @@ -352,6 +427,11 @@ void LiveSession::onMessageReceived(const sp &msg) { break; } + case kWhatSwapped: + { + onSwapped(msg); + break; + } default: TRESPASS(); break; @@ -462,6 +542,10 @@ void LiveSession::finishDisconnect() { // during disconnection either. cancelCheckBandwidthEvent(); + // Protect mPacketSources from a swapPacketSource race condition through disconnect. + // (finishDisconnect, onFinishDisconnect2) + cancelBandwidthSwitch(); + for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); } @@ -501,11 +585,13 @@ sp LiveSession::addFetcher(const char *uri) { sp notify = new AMessage(kWhatFetcherNotify, id()); notify->setString("uri", uri); + notify->setInt32("switchGeneration", mSwitchGeneration); FetcherInfo info; info.mFetcher = new PlaylistFetcher(notify, this, uri); info.mDurationUs = -1ll; info.mIsPrepared = false; + info.mToBeRemoved = false; looper()->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); @@ -845,8 +931,25 @@ status_t LiveSession::selectTrack(size_t index, bool select) { return err; } +bool LiveSession::canSwitchUp() { + // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. + status_t err = OK; + for (size_t i = 0; i < mPacketSources.size(); ++i) { + sp source = mPacketSources.valueAt(i); + int64_t dur = source->getBufferedDurationUs(&err); + if (err == OK && dur > 10000000) { + return true; + } + } + return false; +} + void LiveSession::changeConfiguration( int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { + // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. + // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). + cancelBandwidthSwitch(); + CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; @@ -862,7 +965,8 @@ void LiveSession::changeConfiguration( CHECK_LT(bandwidthIndex, mBandwidthItems.size()); const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); - uint32_t streamMask = 0; + uint32_t streamMask = 0; // streams that should be fetched by the new fetcher + uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher AString URIs[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { @@ -880,9 +984,14 @@ void LiveSession::changeConfiguration( // If we're seeking all current fetchers are discarded. if (timeUs < 0ll) { + // delay fetcher removal + discardFetcher = false; + for (size_t j = 0; j < kMaxStreams; ++j) { - if ((streamMask & indexToType(j)) && uri == URIs[j]) { - discardFetcher = false; + StreamType type = indexToType(j); + if ((streamMask & type) && uri == URIs[j]) { + resumeMask |= type; + streamMask &= ~type; } } } @@ -894,8 +1003,15 @@ void LiveSession::changeConfiguration( } } - sp msg = new AMessage(kWhatChangeConfiguration2, id()); + sp msg; + if (timeUs < 0ll) { + // skip onChangeConfiguration2 (decoder destruction) if switching. + msg = new AMessage(kWhatChangeConfiguration3, id()); + } else { + msg = new AMessage(kWhatChangeConfiguration2, id()); + } msg->setInt32("streamMask", streamMask); + msg->setInt32("resumeMask", resumeMask); msg->setInt64("timeUs", timeUs); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { @@ -978,11 +1094,13 @@ void LiveSession::onChangeConfiguration2(const sp &msg) { } void LiveSession::onChangeConfiguration3(const sp &msg) { + mContinuation.clear(); // All remaining fetchers are still suspended, the player has shutdown // any decoders that needed it. - uint32_t streamMask; + uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); + CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { @@ -991,37 +1109,39 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { } int64_t timeUs; + bool switching = false; CHECK(msg->findInt64("timeUs", &timeUs)); if (timeUs < 0ll) { timeUs = mLastDequeuedTimeUs; + switching = true; } mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; - mStreamMask = streamMask; + mNewStreamMask = streamMask; - // Resume all existing fetchers and assign them packet sources. + // Of all existing fetchers: + // * Resume fetchers that are still needed and assign them original packet sources. + // * Mark otherwise unneeded fetchers for removal. + ALOGV("resuming fetchers for mask 0x%08x", resumeMask); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { const AString &uri = mFetcherInfos.keyAt(i); - uint32_t resumeMask = 0; - sp sources[kMaxStreams]; for (size_t j = 0; j < kMaxStreams; ++j) { - if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { + if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); - resumeMask |= indexToType(j); } } - CHECK_NE(resumeMask, 0u); - - ALOGV("resuming fetchers for mask 0x%08x", resumeMask); - - streamMask &= ~resumeMask; - - mFetcherInfos.valueAt(i).mFetcher->startAsync( - sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); + FetcherInfo &info = mFetcherInfos.editValueAt(i); + if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL + || sources[kSubtitleIndex] != NULL) { + info.mFetcher->startAsync( + sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); + } else { + info.mToBeRemoved = true; + } } // streamMask now only contains the types that need a new fetcher created. @@ -1030,6 +1150,8 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { ALOGV("creating new fetchers for mask 0x%08x", streamMask); } + // Find out when the original fetchers have buffered up to and start the new fetchers + // at a later timestamp. for (size_t i = 0; i < kMaxStreams; i++) { if (!(indexToType(i) & streamMask)) { continue; @@ -1041,12 +1163,40 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { sp fetcher = addFetcher(uri.c_str()); CHECK(fetcher != NULL); + int32_t latestSeq = -1; + int64_t latestTimeUs = 0ll; sp sources[kMaxStreams]; + // TRICKY: looping from i as earlier streams are already removed from streamMask for (size_t j = i; j < kMaxStreams; ++j) { if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); - sources[j]->clear(); + + if (!switching) { + sources[j]->clear(); + } else { + int32_t type, seq; + int64_t srcTimeUs; + sp meta = sources[j]->getLatestMeta(); + + if (meta != NULL && !meta->findInt32("discontinuity", &type)) { + CHECK(meta->findInt32("seq", &seq)); + if (seq > latestSeq) { + latestSeq = seq; + } + CHECK(meta->findInt64("timeUs", &srcTimeUs)); + if (srcTimeUs > latestTimeUs) { + latestTimeUs = srcTimeUs; + } + } + + sources[j] = mPacketSources2.valueFor(indexToType(j)); + sources[j]->clear(); + uint32_t extraStreams = mNewStreamMask & (~mStreamMask); + if (extraStreams & indexToType(j)) { + sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); + } + } streamMask &= ~indexToType(j); } @@ -1056,7 +1206,9 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], - timeUs); + timeUs, + latestTimeUs /* min start time(us) */, + latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); } // All fetchers have now been started, the configuration change @@ -1065,14 +1217,61 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { scheduleCheckBandwidthEvent(); ALOGV("XXX configuration change completed."); - mReconfigurationInProgress = false; + if (switching) { + mSwitchInProgress = true; + } else { + mStreamMask = mNewStreamMask; + } if (mDisconnectReplyID != 0) { finishDisconnect(); } } +void LiveSession::onSwapped(const sp &msg) { + int32_t switchGeneration; + CHECK(msg->findInt32("switchGeneration", &switchGeneration)); + if (switchGeneration != mSwitchGeneration) { + return; + } + + int32_t stream; + CHECK(msg->findInt32("stream", &stream)); + mSwapMask |= stream; + if (mSwapMask != mStreamMask) { + return; + } + + // Check if new variant contains extra streams. + uint32_t extraStreams = mNewStreamMask & (~mStreamMask); + while (extraStreams) { + StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); + swapPacketSource(extraStream); + extraStreams &= ~extraStream; + } + + tryToFinishBandwidthSwitch(); +} + +// Mark switch done when: +// 1. all old buffers are swapped out, AND +// 2. all old fetchers are removed. +void LiveSession::tryToFinishBandwidthSwitch() { + bool needToRemoveFetchers = false; + for (size_t i = 0; i < mFetcherInfos.size(); ++i) { + if (mFetcherInfos.valueAt(i).mToBeRemoved) { + needToRemoveFetchers = true; + break; + } + } + if (!needToRemoveFetchers && mSwapMask == mStreamMask) { + mStreamMask = mNewStreamMask; + mSwitchInProgress = false; + mSwapMask = 0; + } +} + void LiveSession::scheduleCheckBandwidthEvent() { sp msg = new AMessage(kWhatCheckBandwidth, id()); msg->setInt32("generation", mCheckBandwidthGeneration); @@ -1083,16 +1282,37 @@ void LiveSession::cancelCheckBandwidthEvent() { ++mCheckBandwidthGeneration; } -void LiveSession::onCheckBandwidth() { - if (mReconfigurationInProgress) { - scheduleCheckBandwidthEvent(); - return; +void LiveSession::cancelBandwidthSwitch() { + Mutex::Autolock lock(mSwapMutex); + mSwitchGeneration++; + mSwitchInProgress = false; + mSwapMask = 0; +} + +bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { + if (mReconfigurationInProgress || mSwitchInProgress) { + return false; + } + + if (mPrevBandwidthIndex < 0) { + return true; } + if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { + return false; + } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { + return canSwitchUp(); + } else { + return true; + } +} + +void LiveSession::onCheckBandwidth() { size_t bandwidthIndex = getBandwidthIndex(); - if (mPrevBandwidthIndex < 0 - || bandwidthIndex != (size_t)mPrevBandwidthIndex) { + if (canSwitchBandwidthTo(bandwidthIndex)) { changeConfiguration(-1ll /* timeUs */, bandwidthIndex); + } else { + scheduleCheckBandwidthEvent(); } // Handling the kWhatCheckBandwidth even here does _not_ automatically diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h index c4d125c..91bbcea 100644 --- a/media/libstagefright/httplive/LiveSession.h +++ b/media/libstagefright/httplive/LiveSession.h @@ -83,6 +83,11 @@ struct LiveSession : public AHandler { kWhatPreparationFailed, }; + // create a format-change discontinuity + // + // swap: + // whether is format-change discontinuity should trigger a buffer swap + sp createFormatChangeBuffer(bool swap = true); protected: virtual ~LiveSession(); @@ -101,6 +106,7 @@ private: kWhatChangeConfiguration2 = 'chC2', kWhatChangeConfiguration3 = 'chC3', kWhatFinishDisconnect2 = 'fin2', + kWhatSwapped = 'swap', }; struct BandwidthItem { @@ -112,6 +118,7 @@ private: sp mFetcher; int64_t mDurationUs; bool mIsPrepared; + bool mToBeRemoved; }; struct StreamItem { @@ -146,9 +153,26 @@ private: KeyedVector mFetcherInfos; uint32_t mStreamMask; + // Masks used during reconfiguration: + // mNewStreamMask: streams in the variant playlist we're switching to; + // we don't want to immediately overwrite the original value. + uint32_t mNewStreamMask; + + // mSwapMask: streams that have started to playback content in the new variant playlist; + // we use this to track reconfiguration progress. + uint32_t mSwapMask; + KeyedVector > mPacketSources; + // A second set of packet sources that buffer content for the variant we're switching to. + KeyedVector > mPacketSources2; + + // A mutex used to serialize two sets of events: + // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND + // * a forced bandwidth switch termination in cancelSwitch on the live looper. + Mutex mSwapMutex; int32_t mCheckBandwidthGeneration; + int32_t mSwitchGeneration; size_t mContinuationCounter; sp mContinuation; @@ -157,6 +181,7 @@ private: int64_t mRealTimeBaseUs; bool mReconfigurationInProgress; + bool mSwitchInProgress; uint32_t mDisconnectReplyID; sp addFetcher(const char *uri); @@ -199,16 +224,27 @@ private: void onChangeConfiguration(const sp &msg); void onChangeConfiguration2(const sp &msg); void onChangeConfiguration3(const sp &msg); + void onSwapped(const sp &msg); + void tryToFinishBandwidthSwitch(); void scheduleCheckBandwidthEvent(); void cancelCheckBandwidthEvent(); + // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources + // from being swapped out on stale discontinuities while manipulating + // mPacketSources/mPacketSources2. + void cancelBandwidthSwitch(); + + bool canSwitchBandwidthTo(size_t bandwidthIndex); void onCheckBandwidth(); void finishDisconnect(); void postPrepared(status_t err); + void swapPacketSource(StreamType stream); + bool canSwitchUp(); + DISALLOW_EVIL_CONSTRUCTORS(LiveSession); }; diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp index 030cbde..1a02e85 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.cpp +++ b/media/libstagefright/httplive/PlaylistFetcher.cpp @@ -48,16 +48,20 @@ namespace android { // static const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; +const int32_t PlaylistFetcher::kNumSkipFrames = 10; PlaylistFetcher::PlaylistFetcher( const sp ¬ify, const sp &session, const char *uri) : mNotify(notify), + mStartTimeUsNotify(notify->dup()), mSession(session), mURI(uri), mStreamTypeMask(0), mStartTimeUs(-1ll), + mMinStartTimeUs(0ll), + mStopParams(NULL), mLastPlaylistFetchTimeUs(-1ll), mSeqNumber(-1), mNumRetries(0), @@ -69,6 +73,8 @@ PlaylistFetcher::PlaylistFetcher( mFirstPTSValid(false), mAbsoluteTimeAnchorUs(0ll) { memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); + mStartTimeUsNotify->setInt32("what", kWhatStartedAt); + mStartTimeUsNotify->setInt32("streamMask", 0); } PlaylistFetcher::~PlaylistFetcher() { @@ -325,7 +331,9 @@ void PlaylistFetcher::startAsync( const sp &audioSource, const sp &videoSource, const sp &subtitleSource, - int64_t startTimeUs) { + int64_t startTimeUs, + int64_t minStartTimeUs, + int32_t startSeqNumberHint) { sp msg = new AMessage(kWhatStart, id()); uint32_t streamTypeMask = 0ul; @@ -347,6 +355,8 @@ void PlaylistFetcher::startAsync( msg->setInt32("streamTypeMask", streamTypeMask); msg->setInt64("startTimeUs", startTimeUs); + msg->setInt64("minStartTimeUs", minStartTimeUs); + msg->setInt32("startSeqNumberHint", startSeqNumberHint); msg->post(); } @@ -358,6 +368,12 @@ void PlaylistFetcher::stopAsync() { (new AMessage(kWhatStop, id()))->post(); } +void PlaylistFetcher::resumeUntilAsync(const sp ¶ms) { + AMessage* msg = new AMessage(kWhatResumeUntil, id()); + msg->setMessage("params", params); + msg->post(); +} + void PlaylistFetcher::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatStart: @@ -392,6 +408,7 @@ void PlaylistFetcher::onMessageReceived(const sp &msg) { } case kWhatMonitorQueue: + case kWhatDownloadNext: { int32_t generation; CHECK(msg->findInt32("generation", &generation)); @@ -401,7 +418,17 @@ void PlaylistFetcher::onMessageReceived(const sp &msg) { break; } - onMonitorQueue(); + if (msg->what() == kWhatMonitorQueue) { + onMonitorQueue(); + } else { + onDownloadNext(); + } + break; + } + + case kWhatResumeUntil: + { + onResumeUntil(msg); break; } @@ -417,7 +444,10 @@ status_t PlaylistFetcher::onStart(const sp &msg) { CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); int64_t startTimeUs; + int32_t startSeqNumberHint; CHECK(msg->findInt64("startTimeUs", &startTimeUs)); + CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs)); + CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint)); if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { void *ptr; @@ -455,6 +485,10 @@ status_t PlaylistFetcher::onStart(const sp &msg) { mPrepared = false; } + if (startSeqNumberHint >= 0) { + mSeqNumber = startSeqNumberHint; + } + postMonitorQueue(); return OK; @@ -462,20 +496,70 @@ status_t PlaylistFetcher::onStart(const sp &msg) { void PlaylistFetcher::onPause() { cancelMonitorQueue(); +} + +void PlaylistFetcher::onStop() { + cancelMonitorQueue(); mPacketSources.clear(); mStreamTypeMask = 0; } -void PlaylistFetcher::onStop() { - cancelMonitorQueue(); +// Resume until we have reached the boundary timestamps listed in `msg`; when +// the remaining time is too short (within a resume threshold) stop immediately +// instead. +status_t PlaylistFetcher::onResumeUntil(const sp &msg) { + sp params; + CHECK(msg->findMessage("params", ¶ms)); + + bool stop = false; + for (size_t i = 0; i < mPacketSources.size(); i++) { + sp packetSource = mPacketSources.valueAt(i); + + const char *stopKey; + int streamType = mPacketSources.keyAt(i); + switch (streamType) { + case LiveSession::STREAMTYPE_VIDEO: + stopKey = "timeUsVideo"; + break; - for (size_t i = 0; i < mPacketSources.size(); ++i) { - mPacketSources.valueAt(i)->clear(); + case LiveSession::STREAMTYPE_AUDIO: + stopKey = "timeUsAudio"; + break; + + case LiveSession::STREAMTYPE_SUBTITLES: + stopKey = "timeUsSubtitle"; + break; + + default: + TRESPASS(); + } + + // Don't resume if we would stop within a resume threshold. + int64_t latestTimeUs = 0, stopTimeUs = 0; + sp latestMeta = packetSource->getLatestMeta(); + if (latestMeta != NULL + && (latestMeta->findInt64("timeUs", &latestTimeUs) + && params->findInt64(stopKey, &stopTimeUs))) { + int64_t diffUs = stopTimeUs - latestTimeUs; + if (diffUs < resumeThreshold(latestMeta)) { + stop = true; + } + } } - mPacketSources.clear(); - mStreamTypeMask = 0; + if (stop) { + for (size_t i = 0; i < mPacketSources.size(); i++) { + mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); + } + stopAsync(); + return OK; + } + + mStopParams = params; + postMonitorQueue(); + + return OK; } void PlaylistFetcher::notifyError(status_t err) { @@ -519,8 +603,9 @@ void PlaylistFetcher::onMonitorQueue() { packetSource->getBufferedDurationUs(&finalResult); finalResult = OK; } else { - bool first = true; - + // Use max stream duration to prevent us from waiting on a non-existent stream; + // when we cannot make out from the manifest what streams are included in a playlist + // we might assume extra streams. for (size_t i = 0; i < mPacketSources.size(); ++i) { if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { continue; @@ -528,9 +613,10 @@ void PlaylistFetcher::onMonitorQueue() { int64_t bufferedStreamDurationUs = mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); - if (first || bufferedStreamDurationUs < bufferedDurationUs) { + ALOGV("buffered %lld for stream %d", + bufferedStreamDurationUs, mPacketSources.keyAt(i)); + if (bufferedStreamDurationUs > bufferedDurationUs) { bufferedDurationUs = bufferedStreamDurationUs; - first = false; } } } @@ -550,7 +636,12 @@ void PlaylistFetcher::onMonitorQueue() { if (finalResult == OK && downloadMore) { ALOGV("monitoring, buffered=%lld < %lld", bufferedDurationUs, durationToBufferUs); - onDownloadNext(); + // delay the next download slightly; hopefully this gives other concurrent fetchers + // a better chance to run. + // onDownloadNext(); + sp msg = new AMessage(kWhatDownloadNext, id()); + msg->setInt32("generation", mMonitorQueueGeneration); + msg->post(1000l); } else { // Nothing to do yet, try again in a second. @@ -617,6 +708,12 @@ void PlaylistFetcher::onDownloadNext() { const int32_t lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; + if (mStartup && mSeqNumber >= 0 + && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) { + // in case we guessed wrong during reconfiguration, try fetching the latest content. + mSeqNumber = lastSeqNumberInPlaylist; + } + if (mSeqNumber < 0) { CHECK_GE(mStartTimeUs, 0ll); @@ -762,6 +859,18 @@ void PlaylistFetcher::onDownloadNext() { err = extractAndQueueAccessUnits(buffer, itemMeta); + if (err == -EAGAIN) { + // bad starting sequence number hint + postMonitorQueue(); + return; + } + + if (err == ERROR_OUT_OF_RANGE) { + // reached stopping point + stopAsync(); + return; + } + if (err != OK) { notifyError(err); return; @@ -818,12 +927,15 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( } if (mTSParser == NULL) { - mTSParser = new ATSParser; + // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. + mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); } if (mNextPTSTimeUs >= 0ll) { sp extra = new AMessage; - extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs); + // Since we are using absolute timestamps, signal an offset of 0 to prevent + // ATSParser from skewing the timestamps of access units. + extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); mTSParser->signalDiscontinuity( ATSParser::DISCONTINUITY_SEEK, extra); @@ -842,17 +954,23 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( offset += 188; } + status_t err = OK; for (size_t i = mPacketSources.size(); i-- > 0;) { sp packetSource = mPacketSources.valueAt(i); + const char *key; ATSParser::SourceType type; - switch (mPacketSources.keyAt(i)) { + const LiveSession::StreamType stream = mPacketSources.keyAt(i); + switch (stream) { + case LiveSession::STREAMTYPE_VIDEO: type = ATSParser::VIDEO; + key = "timeUsVideo"; break; case LiveSession::STREAMTYPE_AUDIO: type = ATSParser::AUDIO; + key = "timeUsAudio"; break; case LiveSession::STREAMTYPE_SUBTITLES: @@ -879,19 +997,87 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( continue; } + int64_t timeUs; sp accessUnit; status_t finalResult; while (source->hasBufferAvailable(&finalResult) && source->dequeueAccessUnit(&accessUnit) == OK) { - // Note that we do NOT dequeue any discontinuities. + + CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); + if (mMinStartTimeUs > 0) { + if (timeUs < mMinStartTimeUs) { + // TODO untested path + // try a later ts + int32_t targetDuration; + mPlaylist->meta()->findInt32("target-duration", &targetDuration); + int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration; + if (incr == 0) { + // increment mSeqNumber by at least one + incr = 1; + } + mSeqNumber += incr; + err = -EAGAIN; + break; + } else { + int64_t startTimeUs; + if (mStartTimeUsNotify != NULL + && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) { + mStartTimeUsNotify->setInt64(key, timeUs); + + uint32_t streamMask = 0; + mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); + streamMask |= mPacketSources.keyAt(i); + mStartTimeUsNotify->setInt32("streamMask", streamMask); + + if (streamMask == mStreamTypeMask) { + mStartTimeUsNotify->post(); + mStartTimeUsNotify.clear(); + } + } + } + } + + if (mStopParams != NULL) { + // Queue discontinuity in original stream. + int64_t stopTimeUs; + if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) { + packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); + mStreamTypeMask &= ~stream; + mPacketSources.removeItemsAt(i); + break; + } + } + + // Note that we do NOT dequeue any discontinuities except for format change. // for simplicity, store a reference to the format in each unit sp format = source->getFormat(); if (format != NULL) { accessUnit->meta()->setObject("format", format); } + + // Stash the sequence number so we can hint future fetchers where to start at. + accessUnit->meta()->setInt32("seq", mSeqNumber); packetSource->queueAccessUnit(accessUnit); } + + if (err != OK) { + break; + } + } + + if (err != OK) { + for (size_t i = mPacketSources.size(); i-- > 0;) { + sp packetSource = mPacketSources.valueAt(i); + packetSource->clear(); + } + return err; + } + + if (!mStreamTypeMask) { + // Signal gap is filled between original and new stream. + ALOGV("ERROR OUT OF RANGE"); + return ERROR_OUT_OF_RANGE; } return OK; @@ -908,6 +1094,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( CHECK(itemMeta->findInt64("durationUs", &durationUs)); buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); buffer->meta()->setInt64("durationUs", durationUs); + buffer->meta()->setInt32("seq", mSeqNumber); packetSource->queueAccessUnit(buffer); return OK; @@ -1044,6 +1231,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( // Each AAC frame encodes 1024 samples. numSamples += 1024; + unit->meta()->setInt32("seq", mSeqNumber); packetSource->queueAccessUnit(unit); offset += aac_frame_length; @@ -1071,4 +1259,33 @@ void PlaylistFetcher::updateDuration() { msg->post(); } +int64_t PlaylistFetcher::resumeThreshold(const sp &msg) { + int64_t durationUs, threshold; + if (msg->findInt64("durationUs", &durationUs)) { + return kNumSkipFrames * durationUs; + } + + sp obj; + msg->findObject("format", &obj); + MetaData *format = static_cast(obj.get()); + + const char *mime; + CHECK(format->findCString(kKeyMIMEType, &mime)); + bool audio = !strncasecmp(mime, "audio/", 6); + if (audio) { + // Assumes 1000 samples per frame. + int32_t sampleRate; + CHECK(format->findInt32(kKeySampleRate, &sampleRate)); + return kNumSkipFrames /* frames */ * 1000 /* samples */ + * (1000000 / sampleRate) /* sample duration (us) */; + } else { + int32_t frameRate; + if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) { + return kNumSkipFrames * (1000000 / frameRate); + } + } + + return 500000ll; +} + } // namespace android diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h index ac04a77..2e0349f 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.h +++ b/media/libstagefright/httplive/PlaylistFetcher.h @@ -43,6 +43,7 @@ struct PlaylistFetcher : public AHandler { kWhatTemporarilyDoneFetching, kWhatPrepared, kWhatPreparationFailed, + kWhatStartedAt, }; PlaylistFetcher( @@ -56,12 +57,16 @@ struct PlaylistFetcher : public AHandler { const sp &audioSource, const sp &videoSource, const sp &subtitleSource, - int64_t startTimeUs = -1ll); + int64_t startTimeUs = -1ll, + int64_t minStartTimeUs = 0ll /* start after this timestamp */, + int32_t startSeqNumberHint = -1 /* try starting at this sequence number */); void pauseAsync(); void stopAsync(); + void resumeUntilAsync(const sp ¶ms); + protected: virtual ~PlaylistFetcher(); virtual void onMessageReceived(const sp &msg); @@ -76,17 +81,25 @@ private: kWhatPause = 'paus', kWhatStop = 'stop', kWhatMonitorQueue = 'moni', + kWhatResumeUntil = 'rsme', + kWhatDownloadNext = 'dlnx', }; static const int64_t kMinBufferedDurationUs; static const int64_t kMaxMonitorDelayUs; + static const int32_t kNumSkipFrames; + // notifications to mSession sp mNotify; + sp mStartTimeUsNotify; + sp mSession; AString mURI; uint32_t mStreamTypeMask; int64_t mStartTimeUs; + int64_t mMinStartTimeUs; // start fetching no earlier than this value + sp mStopParams; // message containing the latest timestamps we should fetch. KeyedVector > mPacketSources; @@ -153,6 +166,9 @@ private: void onMonitorQueue(); void onDownloadNext(); + // Resume a fetcher to continue until the stopping point stored in msg. + status_t onResumeUntil(const sp &msg); + status_t extractAndQueueAccessUnits( const sp &buffer, const sp &itemMeta); @@ -165,6 +181,10 @@ private: void updateDuration(); + // Before resuming a fetcher in onResume, check the remaining duration is longer than that + // returned by resumeThreshold. + int64_t resumeThreshold(const sp &msg); + DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher); }; -- cgit v1.1