diff options
author | Robert Shih <robertshih@google.com> | 2014-02-20 13:07:26 -0800 |
---|---|---|
committer | Lajos Molnar <lajos@google.com> | 2014-03-06 16:27:15 -0800 |
commit | 777ee5ed736c8f6c3f7d196ea022f7432bfd23e1 (patch) | |
tree | 90c59cbca76978fb5fc9961cc19f7a21dd5aa499 /media/libstagefright/httplive/LiveSession.cpp | |
parent | 0523da8f16557a85395da86ab76be6d8cb771da0 (diff) | |
download | frameworks_av-777ee5ed736c8f6c3f7d196ea022f7432bfd23e1.zip frameworks_av-777ee5ed736c8f6c3f7d196ea022f7432bfd23e1.tar.gz frameworks_av-777ee5ed736c8f6c3f7d196ea022f7432bfd23e1.tar.bz2 |
Initial HLS seamless switch implementation.
Bug: 11854054
Change-Id: I75fc2a258111295039ac13cc37e407df25891dd2
Diffstat (limited to 'media/libstagefright/httplive/LiveSession.cpp')
-rw-r--r-- | media/libstagefright/httplive/LiveSession.cpp | 280 |
1 files changed, 250 insertions, 30 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index 79ad100..9e30ebd 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -37,6 +37,8 @@ #include <media/stagefright/MetaData.h> #include <media/stagefright/Utils.h> +#include <utils/Mutex.h> + #include <ctype.h> #include <openssl/aes.h> #include <openssl/md5.h> @@ -57,10 +59,14 @@ LiveSession::LiveSession( : 0)), mPrevBandwidthIndex(-1), mStreamMask(0), + mNewStreamMask(0), + mSwapMask(0), mCheckBandwidthGeneration(0), + mSwitchGeneration(0), mLastDequeuedTimeUs(0ll), mRealTimeBaseUs(0ll), mReconfigurationInProgress(false), + mSwitchInProgress(false), mDisconnectReplyID(0) { if (mUIDValid) { mHTTPDataSource->setUID(mUID); @@ -72,16 +78,37 @@ LiveSession::LiveSession( for (size_t i = 0; i < kMaxStreams; ++i) { mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); + mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); } } LiveSession::~LiveSession() { } +sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { + ABuffer *discontinuity = new ABuffer(0); + discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); + discontinuity->meta()->setInt32("swapPacketSource", swap); + discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); + discontinuity->meta()->setInt64("timeUs", -1); + return discontinuity; +} + +void LiveSession::swapPacketSource(StreamType stream) { + sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); + sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); + sp<AnotherPacketSource> tmp = aps; + aps = aps2; + aps2 = tmp; + aps2->clear(); +} + status_t LiveSession::dequeueAccessUnit( StreamType stream, sp<ABuffer> *accessUnit) { if (!(mStreamMask & stream)) { - return UNKNOWN_ERROR; + // return -EWOULDBLOCK to avoid halting the decoder + // when switching between audio/video and audio only. + return -EWOULDBLOCK; } sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); @@ -121,6 +148,25 @@ status_t LiveSession::dequeueAccessUnit( streamStr, type, extra == NULL ? "NULL" : extra->debugString().c_str()); + + int32_t swap; + if (type == ATSParser::DISCONTINUITY_FORMATCHANGE + && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) + && swap) { + + int32_t switchGeneration; + CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); + { + Mutex::Autolock lock(mSwapMutex); + if (switchGeneration == mSwitchGeneration) { + swapPacketSource(stream); + sp<AMessage> msg = new AMessage(kWhatSwapped, id()); + msg->setInt32("stream", stream); + msg->setInt32("switchGeneration", switchGeneration); + msg->post(); + } + } + } } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { int64_t timeUs; @@ -142,6 +188,7 @@ status_t LiveSession::dequeueAccessUnit( } status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { + // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; } @@ -238,7 +285,12 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { if (what == PlaylistFetcher::kWhatStopped) { AString uri; CHECK(msg->findString("uri", &uri)); - mFetcherInfos.removeItem(uri); + if (mFetcherInfos.removeItem(uri) < 0) { + // ignore duplicated kWhatStopped messages. + break; + } + + tryToFinishBandwidthSwitch(); } if (mContinuation != NULL) { @@ -274,6 +326,8 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { postPrepared(err); } + cancelBandwidthSwitch(); + mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); @@ -312,6 +366,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } + case PlaylistFetcher::kWhatStartedAt: + { + int32_t switchGeneration; + CHECK(msg->findInt32("switchGeneration", &switchGeneration)); + + if (switchGeneration != mSwitchGeneration) { + break; + } + + // Resume fetcher for the original variant; the resumed fetcher should + // continue until the timestamps found in msg, which is stored by the + // new fetcher to indicate where the new variant has started buffering. + for (size_t i = 0; i < mFetcherInfos.size(); i++) { + const FetcherInfo info = mFetcherInfos.valueAt(i); + if (info.mToBeRemoved) { + info.mFetcher->resumeUntilAsync(msg); + } + } + break; + } + default: TRESPASS(); } @@ -356,6 +431,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } + case kWhatSwapped: + { + onSwapped(msg); + break; + } default: TRESPASS(); break; @@ -466,6 +546,10 @@ void LiveSession::finishDisconnect() { // during disconnection either. cancelCheckBandwidthEvent(); + // Protect mPacketSources from a swapPacketSource race condition through disconnect. + // (finishDisconnect, onFinishDisconnect2) + cancelBandwidthSwitch(); + for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); } @@ -505,11 +589,13 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); notify->setString("uri", uri); + notify->setInt32("switchGeneration", mSwitchGeneration); FetcherInfo info; info.mFetcher = new PlaylistFetcher(notify, this, uri); info.mDurationUs = -1ll; info.mIsPrepared = false; + info.mToBeRemoved = false; looper()->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); @@ -844,8 +930,25 @@ status_t LiveSession::selectTrack(size_t index, bool select) { return err; } +bool LiveSession::canSwitchUp() { + // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. + status_t err = OK; + for (size_t i = 0; i < mPacketSources.size(); ++i) { + sp<AnotherPacketSource> source = mPacketSources.valueAt(i); + int64_t dur = source->getBufferedDurationUs(&err); + if (err == OK && dur > 10000000) { + return true; + } + } + return false; +} + void LiveSession::changeConfiguration( int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { + // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. + // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). + cancelBandwidthSwitch(); + CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; @@ -861,7 +964,8 @@ void LiveSession::changeConfiguration( CHECK_LT(bandwidthIndex, mBandwidthItems.size()); const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); - uint32_t streamMask = 0; + uint32_t streamMask = 0; // streams that should be fetched by the new fetcher + uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher AString URIs[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { @@ -879,9 +983,14 @@ void LiveSession::changeConfiguration( // If we're seeking all current fetchers are discarded. if (timeUs < 0ll) { + // delay fetcher removal + discardFetcher = false; + for (size_t j = 0; j < kMaxStreams; ++j) { - if ((streamMask & indexToType(j)) && uri == URIs[j]) { - discardFetcher = false; + StreamType type = indexToType(j); + if ((streamMask & type) && uri == URIs[j]) { + resumeMask |= type; + streamMask &= ~type; } } } @@ -893,8 +1002,15 @@ void LiveSession::changeConfiguration( } } - sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id()); + sp<AMessage> msg; + if (timeUs < 0ll) { + // skip onChangeConfiguration2 (decoder destruction) if switching. + msg = new AMessage(kWhatChangeConfiguration3, id()); + } else { + msg = new AMessage(kWhatChangeConfiguration2, id()); + } msg->setInt32("streamMask", streamMask); + msg->setInt32("resumeMask", resumeMask); msg->setInt64("timeUs", timeUs); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { @@ -977,11 +1093,13 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { } void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { + mContinuation.clear(); // All remaining fetchers are still suspended, the player has shutdown // any decoders that needed it. - uint32_t streamMask; + uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); + CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { @@ -990,37 +1108,39 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { } int64_t timeUs; + bool switching = false; CHECK(msg->findInt64("timeUs", &timeUs)); if (timeUs < 0ll) { timeUs = mLastDequeuedTimeUs; + switching = true; } mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; - mStreamMask = streamMask; + mNewStreamMask = streamMask; - // Resume all existing fetchers and assign them packet sources. + // Of all existing fetchers: + // * Resume fetchers that are still needed and assign them original packet sources. + // * Mark otherwise unneeded fetchers for removal. + ALOGV("resuming fetchers for mask 0x%08x", resumeMask); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { const AString &uri = mFetcherInfos.keyAt(i); - uint32_t resumeMask = 0; - sp<AnotherPacketSource> sources[kMaxStreams]; for (size_t j = 0; j < kMaxStreams; ++j) { - if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { + if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); - resumeMask |= indexToType(j); } } - CHECK_NE(resumeMask, 0u); - - ALOGV("resuming fetchers for mask 0x%08x", resumeMask); - - streamMask &= ~resumeMask; - - mFetcherInfos.valueAt(i).mFetcher->startAsync( - sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); + FetcherInfo &info = mFetcherInfos.editValueAt(i); + if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL + || sources[kSubtitleIndex] != NULL) { + info.mFetcher->startAsync( + sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); + } else { + info.mToBeRemoved = true; + } } // streamMask now only contains the types that need a new fetcher created. @@ -1029,6 +1149,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { ALOGV("creating new fetchers for mask 0x%08x", streamMask); } + // Find out when the original fetchers have buffered up to and start the new fetchers + // at a later timestamp. for (size_t i = 0; i < kMaxStreams; i++) { if (!(indexToType(i) & streamMask)) { continue; @@ -1040,12 +1162,40 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); CHECK(fetcher != NULL); + int32_t latestSeq = -1; + int64_t latestTimeUs = 0ll; sp<AnotherPacketSource> sources[kMaxStreams]; + // TRICKY: looping from i as earlier streams are already removed from streamMask for (size_t j = i; j < kMaxStreams; ++j) { if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); - sources[j]->clear(); + + if (!switching) { + sources[j]->clear(); + } else { + int32_t type, seq; + int64_t srcTimeUs; + sp<AMessage> meta = sources[j]->getLatestMeta(); + + if (meta != NULL && !meta->findInt32("discontinuity", &type)) { + CHECK(meta->findInt32("seq", &seq)); + if (seq > latestSeq) { + latestSeq = seq; + } + CHECK(meta->findInt64("timeUs", &srcTimeUs)); + if (srcTimeUs > latestTimeUs) { + latestTimeUs = srcTimeUs; + } + } + + sources[j] = mPacketSources2.valueFor(indexToType(j)); + sources[j]->clear(); + uint32_t extraStreams = mNewStreamMask & (~mStreamMask); + if (extraStreams & indexToType(j)) { + sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); + } + } streamMask &= ~indexToType(j); } @@ -1055,7 +1205,9 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], - timeUs); + timeUs, + latestTimeUs /* min start time(us) */, + latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); } // All fetchers have now been started, the configuration change @@ -1064,14 +1216,61 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { scheduleCheckBandwidthEvent(); ALOGV("XXX configuration change completed."); - mReconfigurationInProgress = false; + if (switching) { + mSwitchInProgress = true; + } else { + mStreamMask = mNewStreamMask; + } if (mDisconnectReplyID != 0) { finishDisconnect(); } } +void LiveSession::onSwapped(const sp<AMessage> &msg) { + int32_t switchGeneration; + CHECK(msg->findInt32("switchGeneration", &switchGeneration)); + if (switchGeneration != mSwitchGeneration) { + return; + } + + int32_t stream; + CHECK(msg->findInt32("stream", &stream)); + mSwapMask |= stream; + if (mSwapMask != mStreamMask) { + return; + } + + // Check if new variant contains extra streams. + uint32_t extraStreams = mNewStreamMask & (~mStreamMask); + while (extraStreams) { + StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); + swapPacketSource(extraStream); + extraStreams &= ~extraStream; + } + + tryToFinishBandwidthSwitch(); +} + +// Mark switch done when: +// 1. all old buffers are swapped out, AND +// 2. all old fetchers are removed. +void LiveSession::tryToFinishBandwidthSwitch() { + bool needToRemoveFetchers = false; + for (size_t i = 0; i < mFetcherInfos.size(); ++i) { + if (mFetcherInfos.valueAt(i).mToBeRemoved) { + needToRemoveFetchers = true; + break; + } + } + if (!needToRemoveFetchers && mSwapMask == mStreamMask) { + mStreamMask = mNewStreamMask; + mSwitchInProgress = false; + mSwapMask = 0; + } +} + void LiveSession::scheduleCheckBandwidthEvent() { sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); msg->setInt32("generation", mCheckBandwidthGeneration); @@ -1082,16 +1281,37 @@ void LiveSession::cancelCheckBandwidthEvent() { ++mCheckBandwidthGeneration; } -void LiveSession::onCheckBandwidth() { - if (mReconfigurationInProgress) { - scheduleCheckBandwidthEvent(); - return; +void LiveSession::cancelBandwidthSwitch() { + Mutex::Autolock lock(mSwapMutex); + mSwitchGeneration++; + mSwitchInProgress = false; + mSwapMask = 0; +} + +bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { + if (mReconfigurationInProgress || mSwitchInProgress) { + return false; + } + + if (mPrevBandwidthIndex < 0) { + return true; } + if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { + return false; + } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { + return canSwitchUp(); + } else { + return true; + } +} + +void LiveSession::onCheckBandwidth() { size_t bandwidthIndex = getBandwidthIndex(); - if (mPrevBandwidthIndex < 0 - || bandwidthIndex != (size_t)mPrevBandwidthIndex) { + if (canSwitchBandwidthTo(bandwidthIndex)) { changeConfiguration(-1ll /* timeUs */, bandwidthIndex); + } else { + scheduleCheckBandwidthEvent(); } // Handling the kWhatCheckBandwidth even here does _not_ automatically |