diff options
Diffstat (limited to 'media/libstagefright/httplive/LiveSession.cpp')
-rw-r--r-- | media/libstagefright/httplive/LiveSession.cpp | 1489 |
1 files changed, 951 insertions, 538 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index d0f3bc2..2d93152 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -33,6 +33,7 @@ #include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/foundation/AUtils.h> #include <media/stagefright/DataSource.h> #include <media/stagefright/FileSource.h> #include <media/stagefright/MediaErrors.h> @@ -49,8 +50,96 @@ namespace android { -// Number of recently-read bytes to use for bandwidth estimation -const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; +// static +// Bandwidth Switch Mark Defaults +const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; +const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; +const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; +const int64_t LiveSession::kResumeThresholdUs = 100000ll; + +// Buffer Prepare/Ready/Underflow Marks +const int64_t LiveSession::kReadyMarkUs = 5000000ll; +const int64_t LiveSession::kPrepareMarkUs = 1500000ll; +const int64_t LiveSession::kUnderflowMarkUs = 1000000ll; + +struct LiveSession::BandwidthEstimator : public RefBase { + BandwidthEstimator(); + + void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); + bool estimateBandwidth(int32_t *bandwidth); + +private: + // Bandwidth estimation parameters + static const int32_t kMaxBandwidthHistoryItems = 20; + static const int64_t kMaxBandwidthHistoryWindowUs = 5000000ll; // 5 sec + + struct BandwidthEntry { + int64_t mDelayUs; + size_t mNumBytes; + }; + + Mutex mLock; + List<BandwidthEntry> mBandwidthHistory; + int64_t mTotalTransferTimeUs; + size_t mTotalTransferBytes; + + DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); +}; + +LiveSession::BandwidthEstimator::BandwidthEstimator() : + mTotalTransferTimeUs(0), + mTotalTransferBytes(0) { +} + +void LiveSession::BandwidthEstimator::addBandwidthMeasurement( + size_t numBytes, int64_t delayUs) { + AutoMutex autoLock(mLock); + + BandwidthEntry entry; + entry.mDelayUs = delayUs; + entry.mNumBytes = numBytes; + mTotalTransferTimeUs += delayUs; + mTotalTransferBytes += numBytes; + mBandwidthHistory.push_back(entry); + + // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, + // and total transfer time at least kMaxBandwidthHistoryWindowUs. + while (mBandwidthHistory.size() > kMaxBandwidthHistoryItems) { + List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); + if (mTotalTransferTimeUs - it->mDelayUs < kMaxBandwidthHistoryWindowUs) { + break; + } + mTotalTransferTimeUs -= it->mDelayUs; + mTotalTransferBytes -= it->mNumBytes; + mBandwidthHistory.erase(mBandwidthHistory.begin()); + } +} + +bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) { + AutoMutex autoLock(mLock); + + if (mBandwidthHistory.size() < 2) { + return false; + } + + *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); + return true; +} + +//static +const char *LiveSession::getKeyForStream(StreamType type) { + switch (type) { + case STREAMTYPE_VIDEO: + return "timeUsVideo"; + case STREAMTYPE_AUDIO: + return "timeUsAudio"; + case STREAMTYPE_SUBTITLES: + return "timeUsSubtitle"; + default: + TRESPASS(); + } + return NULL; +} LiveSession::LiveSession( const sp<AMessage> ¬ify, uint32_t flags, @@ -58,146 +147,68 @@ LiveSession::LiveSession( : mNotify(notify), mFlags(flags), mHTTPService(httpService), + mBuffering(false), mInPreparationPhase(true), + mPollBufferingGeneration(0), + mPrevBufferPercentage(-1), mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), mCurBandwidthIndex(-1), + mOrigBandwidthIndex(-1), + mLastBandwidthBps(-1ll), + mBandwidthEstimator(new BandwidthEstimator()), mStreamMask(0), mNewStreamMask(0), mSwapMask(0), - mCheckBandwidthGeneration(0), mSwitchGeneration(0), mSubtitleGeneration(0), mLastDequeuedTimeUs(0ll), mRealTimeBaseUs(0ll), mReconfigurationInProgress(false), mSwitchInProgress(false), - mDisconnectReplyID(0), - mSeekReplyID(0), + mUpSwitchMark(kUpSwitchMarkUs), + mDownSwitchMark(kDownSwitchMarkUs), + mUpSwitchMargin(kUpSwitchMarginUs), mFirstTimeUsValid(false), mFirstTimeUs(0), mLastSeekTimeUs(0) { - mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); for (size_t i = 0; i < kMaxStreams; ++i) { - mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); - mBuffering[i] = false; - } - - size_t numHistoryItems = kBandwidthHistoryBytes / - PlaylistFetcher::kDownloadBlockSize + 1; - if (numHistoryItems < 5) { - numHistoryItems = 5; } - mHTTPDataSource->setBandwidthHistorySize(numHistoryItems); } LiveSession::~LiveSession() { -} - -sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { - ABuffer *discontinuity = new ABuffer(0); - discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); - discontinuity->meta()->setInt32("swapPacketSource", swap); - discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); - discontinuity->meta()->setInt64("timeUs", -1); - return discontinuity; -} - -void LiveSession::swapPacketSource(StreamType stream) { - sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); - sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); - sp<AnotherPacketSource> tmp = aps; - aps = aps2; - aps2 = tmp; - aps2->clear(); + if (mFetcherLooper != NULL) { + mFetcherLooper->stop(); + } } status_t LiveSession::dequeueAccessUnit( StreamType stream, sp<ABuffer> *accessUnit) { - if (!(mStreamMask & stream)) { - // return -EWOULDBLOCK to avoid halting the decoder - // when switching between audio/video and audio only. - return -EWOULDBLOCK; - } - - status_t finalResult; - sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); - if (discontinuityQueue->hasBufferAvailable(&finalResult)) { - discontinuityQueue->dequeueAccessUnit(accessUnit); - // seeking, track switching - sp<AMessage> extra; - int64_t timeUs; - if ((*accessUnit)->meta()->findMessage("extra", &extra) - && extra != NULL - && extra->findInt64("timeUs", &timeUs)) { - // seeking only - mLastSeekTimeUs = timeUs; - mDiscontinuityOffsetTimesUs.clear(); - mDiscontinuityAbsStartTimesUs.clear(); - } - return INFO_DISCONTINUITY; - } - + status_t finalResult = OK; sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); ssize_t idx = typeToIndex(stream); - if (!packetSource->hasBufferAvailable(&finalResult)) { + // Do not let client pull data if we don't have data packets yet. + // We might only have a format discontinuity queued without data. + // When NuPlayerDecoder dequeues the format discontinuity, it will + // immediately try to getFormat. If we return NULL, NuPlayerDecoder + // thinks it can do seamless change, so will not shutdown decoder. + // When the actual format arrives, it can't handle it and get stuck. + if (!packetSource->hasDataBufferAvailable(&finalResult)) { if (finalResult == OK) { - mBuffering[idx] = true; return -EAGAIN; } else { return finalResult; } } - int32_t targetDuration = 0; - sp<AMessage> meta = packetSource->getLatestEnqueuedMeta(); - if (meta != NULL) { - meta->findInt32("targetDuration", &targetDuration); - } - - int64_t targetDurationUs = targetDuration * 1000000ll; - if (targetDurationUs == 0 || - targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) { - // Fetchers limit buffering to - // min(3 * targetDuration, kMinBufferedDurationUs) - targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs; - } - - if (mBuffering[idx]) { - if (mSwitchInProgress - || packetSource->isFinished(0) - || packetSource->getEstimatedDurationUs() > targetDurationUs) { - mBuffering[idx] = false; - } - } - - if (mBuffering[idx]) { - return -EAGAIN; - } - - // wait for counterpart - sp<AnotherPacketSource> otherSource; - uint32_t mask = mNewStreamMask & mStreamMask; - uint32_t fetchersMask = 0; - for (size_t i = 0; i < mFetcherInfos.size(); ++i) { - uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); - fetchersMask |= fetcherMask; - } - mask &= fetchersMask; - if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { - otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); - } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { - otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); - } - if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { - return finalResult == OK ? -EAGAIN : finalResult; - } + // Let the client dequeue as long as we have buffers available + // Do not make pause/resume decisions here. status_t err = packetSource->dequeueAccessUnit(accessUnit); @@ -235,42 +246,6 @@ status_t LiveSession::dequeueAccessUnit( streamStr, type, extra == NULL ? "NULL" : extra->debugString().c_str()); - - int32_t swap; - if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { - int32_t switchGeneration; - CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); - { - Mutex::Autolock lock(mSwapMutex); - if (switchGeneration == mSwitchGeneration) { - swapPacketSource(stream); - sp<AMessage> msg = new AMessage(kWhatSwapped, id()); - msg->setInt32("stream", stream); - msg->setInt32("switchGeneration", switchGeneration); - msg->post(); - } - } - } else { - size_t seq = strm.mCurDiscontinuitySeq; - int64_t offsetTimeUs; - if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { - offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); - } else { - offsetTimeUs = 0; - } - - seq += 1; - if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { - int64_t firstTimeUs; - firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); - offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; - offsetTimeUs += strm.mLastSampleDurationUs; - } else { - offsetTimeUs += strm.mLastSampleDurationUs; - } - - mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); - } } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { @@ -278,7 +253,26 @@ status_t LiveSession::dequeueAccessUnit( int32_t discontinuitySeq = 0; CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); - strm.mCurDiscontinuitySeq = discontinuitySeq; + if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { + int64_t offsetTimeUs; + if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); + } else { + offsetTimeUs = 0; + } + + if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + int64_t firstTimeUs; + firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); + offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; + offsetTimeUs += strm.mLastSampleDurationUs; + } else { + offsetTimeUs += strm.mLastSampleDurationUs; + } + + mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); + strm.mCurDiscontinuitySeq = discontinuitySeq; + } int32_t discard = 0; int64_t firstTimeUs; @@ -331,7 +325,6 @@ status_t LiveSession::dequeueAccessUnit( } status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { - // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; } @@ -344,12 +337,21 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { return -EAGAIN; } + if (stream == STREAMTYPE_AUDIO) { + // set AAC input buffer size to 32K bytes (256kbps x 1sec) + meta->setInt32(kKeyMaxInputSize, 32 * 1024); + } + return convertMetaDataToMessage(meta, format); } +sp<HTTPBase> LiveSession::getHTTPDataSource() { + return new MediaHTTP(mHTTPService->makeHTTPConnection()); +} + void LiveSession::connectAsync( const char *url, const KeyedVector<String8, String8> *headers) { - sp<AMessage> msg = new AMessage(kWhatConnect, id()); + sp<AMessage> msg = new AMessage(kWhatConnect, this); msg->setString("url", url); if (headers != NULL) { @@ -362,7 +364,7 @@ void LiveSession::connectAsync( } status_t LiveSession::disconnect() { - sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); + sp<AMessage> msg = new AMessage(kWhatDisconnect, this); sp<AMessage> response; status_t err = msg->postAndAwaitResponse(&response); @@ -371,7 +373,7 @@ status_t LiveSession::disconnect() { } status_t LiveSession::seekTo(int64_t timeUs) { - sp<AMessage> msg = new AMessage(kWhatSeek, id()); + sp<AMessage> msg = new AMessage(kWhatSeek, this); msg->setInt64("timeUs", timeUs); sp<AMessage> response; @@ -380,6 +382,95 @@ status_t LiveSession::seekTo(int64_t timeUs) { return err; } +bool LiveSession::checkSwitchProgress( + sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { + AString newUri; + CHECK(stopParams->findString("uri", &newUri)); + + *needResumeUntil = false; + sp<AMessage> firstNewMeta[kMaxStreams]; + for (size_t i = 0; i < kMaxStreams; ++i) { + StreamType stream = indexToType(i); + if (!(mSwapMask & mNewStreamMask & stream) + || (mStreams[i].mNewUri != newUri)) { + continue; + } + if (stream == STREAMTYPE_SUBTITLES) { + continue; + } + sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); + + // First, get latest dequeued meta, which is where the decoder is at. + // (when upswitching, we take the meta after a certain delay, so that + // the decoder is left with some cushion) + sp<AMessage> lastDequeueMeta, lastEnqueueMeta; + if (delayUs > 0) { + lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); + if (lastDequeueMeta == NULL) { + // this means we don't have enough cushion, try again later + ALOGV("[%s] up switching failed due to insufficient buffer", + stream == STREAMTYPE_AUDIO ? "audio" : "video"); + return false; + } + } else { + // It's okay for lastDequeueMeta to be NULL here, it means the + // decoder hasn't even started dequeueing + lastDequeueMeta = source->getLatestDequeuedMeta(); + } + // Then, trim off packets at beginning of mPacketSources2 that's before + // the latest dequeued time. These samples are definitely too late. + firstNewMeta[i] = mPacketSources2.editValueAt(i) + ->trimBuffersBeforeMeta(lastDequeueMeta); + + // Now firstNewMeta[i] is the first sample after the trim. + // If it's NULL, we failed because dequeue already past all samples + // in mPacketSource2, we have to try again. + if (firstNewMeta[i] == NULL) { + HLSTime dequeueTime(lastDequeueMeta); + ALOGV("[%s] dequeue time (%d, %lld) past start time", + stream == STREAMTYPE_AUDIO ? "audio" : "video", + dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); + return false; + } + + // Otherwise, we check if mPacketSources2 overlaps with what old fetcher + // already fetched, and see if we need to resumeUntil + lastEnqueueMeta = source->getLatestEnqueuedMeta(); + // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity + // boundary, no need to resume as the content will look different anyways + if (lastEnqueueMeta != NULL) { + HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); + + // no need to resume old fetcher if new fetcher started in different + // discontinuity sequence, as the content will look different. + *needResumeUntil |= (startTime.mSeq == lastTime.mSeq + && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); + + // update the stopTime for resumeUntil + stopParams->setInt32("discontinuitySeq", startTime.mSeq); + stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); + } + } + + // if we're here, it means dequeue progress hasn't passed some samples in + // mPacketSource2, we can trim off the excess in mPacketSource. + // (old fetcher might still need to resumeUntil the start time of new fetcher) + for (size_t i = 0; i < kMaxStreams; ++i) { + StreamType stream = indexToType(i); + if (!(mSwapMask & mNewStreamMask & stream) + || (newUri != mStreams[i].mNewUri) + || stream == STREAMTYPE_SUBTITLES) { + continue; + } + mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); + } + + // no resumeUntil if already underflow + *needResumeUntil &= !mBuffering; + + return true; +} + void LiveSession::onMessageReceived(const sp<AMessage> &msg) { switch (msg->what()) { case kWhatConnect: @@ -402,7 +493,7 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { case kWhatSeek: { - uint32_t seekReplyID; + sp<AReplyToken> seekReplyID; CHECK(msg->senderAwaitsResponse(&seekReplyID)); mSeekReplyID = seekReplyID; mSeekReply = new AMessage; @@ -426,16 +517,25 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { case PlaylistFetcher::kWhatPaused: case PlaylistFetcher::kWhatStopped: { - if (what == PlaylistFetcher::kWhatStopped) { - AString uri; - CHECK(msg->findString("uri", &uri)); - if (mFetcherInfos.removeItem(uri) < 0) { - // ignore duplicated kWhatStopped messages. - break; - } + AString uri; + CHECK(msg->findString("uri", &uri)); + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0) { + // ignore msgs from fetchers that's already gone + break; + } - if (mSwitchInProgress) { - tryToFinishBandwidthSwitch(); + if (what == PlaylistFetcher::kWhatStopped) { + mFetcherLooper->unregisterHandler( + mFetcherInfos[index].mFetcher->id()); + mFetcherInfos.removeItemsAt(index); + } else if (what == PlaylistFetcher::kWhatPaused) { + int32_t seekMode; + CHECK(msg->findInt32("seekMode", &seekMode)); + for (size_t i = 0; i < kMaxStreams; ++i) { + if (mStreams[i].mUri == uri) { + mStreams[i].mSeekMode = (SeekMode) seekMode; + } } } @@ -443,14 +543,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { CHECK_GT(mContinuationCounter, 0); if (--mContinuationCounter == 0) { mContinuation->post(); - - if (mSeekReplyID != 0) { - CHECK(mSeekReply != NULL); - mSeekReply->setInt32("err", OK); - mSeekReply->postReply(mSeekReplyID); - mSeekReplyID = 0; - mSeekReply.clear(); - } } } break; @@ -464,8 +556,21 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { int64_t durationUs; CHECK(msg->findInt64("durationUs", &durationUs)); - FetcherInfo *info = &mFetcherInfos.editValueFor(uri); - info->mDurationUs = durationUs; + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index >= 0) { + FetcherInfo *info = &mFetcherInfos.editValueFor(uri); + info->mDurationUs = durationUs; + } + break; + } + + case PlaylistFetcher::kWhatTargetDurationUpdate: + { + int64_t targetDurationUs; + CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); + mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); + mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); + mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); break; } @@ -506,38 +611,23 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { mPacketSources.valueFor( STREAMTYPE_SUBTITLES)->signalEOS(err); - sp<AMessage> notify = mNotify->dup(); - notify->setInt32("what", kWhatError); - notify->setInt32("err", err); - notify->post(); + postError(err); break; } - case PlaylistFetcher::kWhatTemporarilyDoneFetching: + case PlaylistFetcher::kWhatStopReached: { - AString uri; - CHECK(msg->findString("uri", &uri)); + ALOGV("kWhatStopReached"); - if (mFetcherInfos.indexOfKey(uri) < 0) { - ALOGE("couldn't find uri"); + AString oldUri; + CHECK(msg->findString("uri", &oldUri)); + + ssize_t index = mFetcherInfos.indexOfKey(oldUri); + if (index < 0) { break; } - FetcherInfo *info = &mFetcherInfos.editValueFor(uri); - info->mIsPrepared = true; - - if (mInPreparationPhase) { - bool allFetchersPrepared = true; - for (size_t i = 0; i < mFetcherInfos.size(); ++i) { - if (!mFetcherInfos.valueAt(i).mIsPrepared) { - allFetchersPrepared = false; - break; - } - } - if (allFetchersPrepared) { - postPrepared(OK); - } - } + tryToFinishBandwidthSwitch(oldUri); break; } @@ -550,15 +640,69 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } - // Resume fetcher for the original variant; the resumed fetcher should - // continue until the timestamps found in msg, which is stored by the - // new fetcher to indicate where the new variant has started buffering. - for (size_t i = 0; i < mFetcherInfos.size(); i++) { - const FetcherInfo info = mFetcherInfos.valueAt(i); - if (info.mToBeRemoved) { - info.mFetcher->resumeUntilAsync(msg); + AString uri; + CHECK(msg->findString("uri", &uri)); + + // mark new fetcher mToBeResumed + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index >= 0) { + mFetcherInfos.editValueAt(index).mToBeResumed = true; + } + + // temporarily disable packet sources to be swapped to prevent + // NuPlayerDecoder from dequeuing while we check progress + for (size_t i = 0; i < mPacketSources.size(); ++i) { + if ((mSwapMask & mPacketSources.keyAt(i)) + && uri == mStreams[i].mNewUri) { + mPacketSources.editValueAt(i)->enable(false); + } + } + bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); + // If switching up, require a cushion bigger than kUnderflowMark + // to avoid buffering immediately after the switch. + // (If we don't have that cushion we'd rather cancel and try again.) + int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0; + bool needResumeUntil = false; + sp<AMessage> stopParams = msg; + if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { + // playback time hasn't passed startAt time + if (!needResumeUntil) { + for (size_t i = 0; i < kMaxStreams; ++i) { + if ((mSwapMask & indexToType(i)) + && uri == mStreams[i].mNewUri) { + // have to make a copy of mStreams[i].mUri because + // tryToFinishBandwidthSwitch is modifying mStreams[] + AString oldURI = mStreams[i].mUri; + tryToFinishBandwidthSwitch(oldURI); + break; + } + } + } else { + // startAt time is after last enqueue time + // Resume fetcher for the original variant; the resumed fetcher should + // continue until the timestamps found in msg, which is stored by the + // new fetcher to indicate where the new variant has started buffering. + for (size_t i = 0; i < mFetcherInfos.size(); i++) { + const FetcherInfo &info = mFetcherInfos.valueAt(i); + if (info.mToBeRemoved) { + info.mFetcher->resumeUntilAsync(stopParams); + } + } + } + } else { + // playback time passed startAt time + if (switchUp) { + // if switching up, cancel and retry if condition satisfies again + cancelBandwidthSwitch(true /* resume */); + } else { + resumeFetcher(uri, mSwapMask, -1, true /* newUri */); } } + // re-enable all packet sources + for (size_t i = 0; i < mPacketSources.size(); ++i) { + mPacketSources.editValueAt(i)->enable(true); + } + break; } @@ -569,19 +713,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } - case kWhatCheckBandwidth: - { - int32_t generation; - CHECK(msg->findInt32("generation", &generation)); - - if (generation != mCheckBandwidthGeneration) { - break; - } - - onCheckBandwidth(msg); - break; - } - case kWhatChangeConfiguration: { onChangeConfiguration(msg); @@ -606,21 +737,13 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } - case kWhatSwapped: + case kWhatPollBuffering: { - onSwapped(msg); - break; - } - - case kWhatCheckSwitchDown: - { - onCheckSwitchDown(); - break; - } - - case kWhatSwitchDown: - { - onSwitchDown(); + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + if (generation == mPollBufferingGeneration) { + onPollBuffering(); + } break; } @@ -691,6 +814,14 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { return; } + // create looper for fetchers + if (mFetcherLooper == NULL) { + mFetcherLooper = new ALooper(); + + mFetcherLooper->setName("Fetcher"); + mFetcherLooper->start(false, false); + } + // We trust the content provider to make a reasonable choice of preferred // initial bandwidth by listing it first in the variant playlist. // At startup we really don't have a good estimate on the available @@ -700,6 +831,7 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { size_t initialBandwidthIndex = 0; if (mPlaylist->isVariantPlaylist()) { + Vector<BandwidthItem> itemsWithVideo; for (size_t i = 0; i < mPlaylist->size(); ++i) { BandwidthItem item; @@ -711,14 +843,22 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); - if (initialBandwidth == 0) { - initialBandwidth = item.mBandwidth; - } - mBandwidthItems.push(item); + if (mPlaylist->hasType(i, "video")) { + itemsWithVideo.push(item); + } + } + // remove the audio-only variants if we have at least one with video + if (!itemsWithVideo.empty() + && itemsWithVideo.size() < mBandwidthItems.size()) { + mBandwidthItems.clear(); + for (size_t i = 0; i < itemsWithVideo.size(); ++i) { + mBandwidthItems.push(itemsWithVideo[i]); + } } CHECK_GT(mBandwidthItems.size(), 0u); + initialBandwidth = mBandwidthItems[0].mBandwidth; mBandwidthItems.sort(SortByBandwidth); @@ -742,22 +882,20 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { } void LiveSession::finishDisconnect() { + ALOGV("finishDisconnect"); + // No reconfiguration is currently pending, make sure none will trigger // during disconnection either. - cancelCheckBandwidthEvent(); - - // Protect mPacketSources from a swapPacketSource race condition through disconnect. - // (finishDisconnect, onFinishDisconnect2) cancelBandwidthSwitch(); - // cancel switch down monitor - mSwitchDownMonitor.clear(); + // cancel buffer polling + cancelPollBuffering(); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); } - sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); + sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this); mContinuationCounter = mFetcherInfos.size(); mContinuation = msg; @@ -780,7 +918,7 @@ void LiveSession::onFinishDisconnect2() { response->setInt32("err", OK); response->postReply(mDisconnectReplyID); - mDisconnectReplyID = 0; + mDisconnectReplyID.clear(); } sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { @@ -790,16 +928,16 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { return NULL; } - sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); + sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); notify->setString("uri", uri); notify->setInt32("switchGeneration", mSwitchGeneration); FetcherInfo info; info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); info.mDurationUs = -1ll; - info.mIsPrepared = false; info.mToBeRemoved = false; - looper()->registerHandler(info.mFetcher); + info.mToBeResumed = false; + mFetcherLooper->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); @@ -827,14 +965,15 @@ ssize_t LiveSession::fetchFile( int64_t range_offset, int64_t range_length, uint32_t block_size, /* download block size */ sp<DataSource> *source, /* to return and reuse source */ - String8 *actualUrl) { + String8 *actualUrl, + bool forceConnectHTTP /* force connect HTTP when resuing source */) { off64_t size; sp<DataSource> temp_source; if (source == NULL) { source = &temp_source; } - if (*source == NULL) { + if (*source == NULL || forceConnectHTTP) { if (!strncasecmp(url, "file://", 7)) { *source = new FileSource(url + 7); } else if (strncasecmp(url, "http://", 7) @@ -853,13 +992,18 @@ ssize_t LiveSession::fetchFile( ? "" : AStringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str())); } - status_t err = mHTTPDataSource->connect(url, &headers); + + HTTPBase* httpDataSource = + (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get(); + status_t err = httpDataSource->connect(url, &headers); if (err != OK) { return err; } - *source = mHTTPDataSource; + if (*source == NULL) { + *source = mHTTPDataSource; + } } } @@ -949,6 +1093,9 @@ sp<M3UParser> LiveSession::fetchPlaylist( String8 actualUrl; ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); + // close off the connection after use + mHTTPDataSource->disconnect(); + if (err <= 0) { return NULL; } @@ -995,8 +1142,108 @@ static double uniformRand() { } #endif -size_t LiveSession::getBandwidthIndex() { - if (mBandwidthItems.size() == 0) { +bool LiveSession::resumeFetcher( + const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0) { + ALOGE("did not find fetcher for uri: %s", uri.c_str()); + return false; + } + + bool resume = false; + sp<AnotherPacketSource> sources[kMaxStreams]; + for (size_t i = 0; i < kMaxStreams; ++i) { + if ((streamMask & indexToType(i)) + && ((!newUri && uri == mStreams[i].mUri) + || (newUri && uri == mStreams[i].mNewUri))) { + resume = true; + if (newUri) { + sources[i] = mPacketSources2.valueFor(indexToType(i)); + sources[i]->clear(); + } else { + sources[i] = mPacketSources.valueFor(indexToType(i)); + } + } + } + + if (resume) { + ALOGV("resuming fetcher %s, timeUs %lld", uri.c_str(), (long long)timeUs); + SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; + mFetcherInfos.editValueAt(index).mFetcher->startAsync( + sources[kAudioIndex], + sources[kVideoIndex], + sources[kSubtitleIndex], + timeUs, -1, -1, seekMode); + } + + return resume; +} + +float LiveSession::getAbortThreshold( + ssize_t currentBWIndex, ssize_t targetBWIndex) const { + float abortThreshold = -1.0f; + if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { + /* + If we're switching down, we need to decide whether to + + 1) finish last segment of high-bandwidth variant, or + 2) abort last segment of high-bandwidth variant, and fetch an + overlapping portion from low-bandwidth variant. + + Here we try to maximize the amount of buffer left when the + switch point is met. Given the following parameters: + + B: our current buffering level in seconds + T: target duration in seconds + X: sample duration in seconds remain to fetch in last segment + bw0: bandwidth of old variant (as specified in playlist) + bw1: bandwidth of new variant (as specified in playlist) + bw: measured bandwidth available + + If we choose 1), when switch happens at the end of current + segment, our buffering will be + B + X - X * bw0 / bw + + If we choose 2), when switch happens where we aborted current + segment, our buffering will be + B - (T - X) * bw1 / bw + + We should only choose 1) if + X/T < bw1 / (bw1 + bw0 - bw) + */ + + // Taking the measured current bandwidth at 50% face value only, + // as our bandwidth estimation is a lagging indicator. Being + // conservative on this, we prefer switching to lower bandwidth + // unless we're really confident finishing up the last segment + // of higher bandwidth will be fast. + CHECK(mLastBandwidthBps >= 0); + abortThreshold = + (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth + / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth + + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth + - (float)mLastBandwidthBps * 0.5f); + if (abortThreshold < 0.0f) { + abortThreshold = -1.0f; // do not abort + } + ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", + mBandwidthItems.itemAt(currentBWIndex).mBandwidth, + mBandwidthItems.itemAt(targetBWIndex).mBandwidth, + mLastBandwidthBps, + abortThreshold); + } + return abortThreshold; +} + +void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { + mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); +} + +size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { + if (mBandwidthItems.size() < 2) { + // shouldn't be here if we only have 1 bandwidth, check + // logic to get rid of redundant bandwidth polling + ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); return 0; } @@ -1014,15 +1261,6 @@ size_t LiveSession::getBandwidthIndex() { } if (index < 0) { - int32_t bandwidthBps; - if (mHTTPDataSource != NULL - && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { - ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); - } else { - ALOGV("no bandwidth estimate."); - return 0; // Pick the lowest bandwidth stream by default. - } - char value[PROPERTY_VALUE_MAX]; if (property_get("media.httplive.max-bw", value, NULL)) { char *end; @@ -1039,15 +1277,9 @@ size_t LiveSession::getBandwidthIndex() { index = mBandwidthItems.size() - 1; while (index > 0) { - // consider only 80% of the available bandwidth, but if we are switching up, - // be even more conservative (70%) to avoid overestimating and immediately - // switching back. - size_t adjustedBandwidthBps = bandwidthBps; - if (index > mCurBandwidthIndex) { - adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; - } else { - adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; - } + // be conservative (70%) to avoid overestimating and immediately + // switching down again. + size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { break; } @@ -1107,22 +1339,14 @@ size_t LiveSession::getBandwidthIndex() { return index; } -int64_t LiveSession::latestMediaSegmentStartTimeUs() { - sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); - int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; - if (audioMeta != NULL) { - audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); - } +HLSTime LiveSession::latestMediaSegmentStartTime() const { + HLSTime audioTime(mPacketSources.valueFor( + STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); - sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); - if (videoMeta != NULL - && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { - if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { - minSegmentStartTimeUs = videoSegmentStartTimeUs; - } + HLSTime videoTime(mPacketSources.valueFor( + STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); - } - return minSegmentStartTimeUs; + return audioTime < videoTime ? videoTime : audioTime; } status_t LiveSession::onSeek(const sp<AMessage> &msg) { @@ -1130,7 +1354,7 @@ status_t LiveSession::onSeek(const sp<AMessage> &msg) { CHECK(msg->findInt64("timeUs", &timeUs)); if (!mReconfigurationInProgress) { - changeConfiguration(timeUs, mCurBandwidthIndex); + changeConfiguration(timeUs); return OK; } else { return -EWOULDBLOCK; @@ -1185,8 +1409,7 @@ status_t LiveSession::selectTrack(size_t index, bool select) { ++mSubtitleGeneration; status_t err = mPlaylist->selectTrack(index, select); if (err == OK) { - sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); - msg->setInt32("bandwidthIndex", mCurBandwidthIndex); + sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); msg->setInt32("pickTrack", select); msg->post(); } @@ -1201,35 +1424,18 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const { } } -bool LiveSession::canSwitchUp() { - // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. - status_t err = OK; - for (size_t i = 0; i < mPacketSources.size(); ++i) { - sp<AnotherPacketSource> source = mPacketSources.valueAt(i); - int64_t dur = source->getBufferedDurationUs(&err); - if (err == OK && dur > 10000000) { - return true; - } - } - return false; -} - void LiveSession::changeConfiguration( - int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { - // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. - // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). + int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { cancelBandwidthSwitch(); CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; - - mCurBandwidthIndex = bandwidthIndex; - - ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", - timeUs, bandwidthIndex, pickTrack); - - CHECK_LT(bandwidthIndex, mBandwidthItems.size()); - const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); + if (bandwidthIndex >= 0) { + mOrigBandwidthIndex = mCurBandwidthIndex; + mCurBandwidthIndex = bandwidthIndex; + } + CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); + const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); uint32_t streamMask = 0; // streams that should be fetched by the new fetcher uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher @@ -1244,38 +1450,58 @@ void LiveSession::changeConfiguration( // Step 1, stop and discard fetchers that are no longer needed. // Pause those that we'll reuse. for (size_t i = 0; i < mFetcherInfos.size(); ++i) { - const AString &uri = mFetcherInfos.keyAt(i); - - bool discardFetcher = true; + // skip fetchers that are marked mToBeRemoved, + // these are done and can't be reused + if (mFetcherInfos[i].mToBeRemoved) { + continue; + } - // If we're seeking all current fetchers are discarded. - if (timeUs < 0ll) { - // delay fetcher removal if not picking tracks - discardFetcher = pickTrack; + const AString &uri = mFetcherInfos.keyAt(i); + sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; - for (size_t j = 0; j < kMaxStreams; ++j) { - StreamType type = indexToType(j); - if ((streamMask & type) && uri == URIs[j]) { - resumeMask |= type; - streamMask &= ~type; - discardFetcher = false; - } + bool discardFetcher = true, delayRemoval = false; + for (size_t j = 0; j < kMaxStreams; ++j) { + StreamType type = indexToType(j); + if ((streamMask & type) && uri == URIs[j]) { + resumeMask |= type; + streamMask &= ~type; + discardFetcher = false; } } + // Delay fetcher removal if not picking tracks, AND old fetcher + // has stream mask that overlaps new variant. (Okay to discard + // old fetcher now, if completely no overlap.) + if (discardFetcher && timeUs < 0ll && !pickTrack + && (fetcher->getStreamTypeMask() & streamMask)) { + discardFetcher = false; + delayRemoval = true; + } if (discardFetcher) { - mFetcherInfos.valueAt(i).mFetcher->stopAsync(); + fetcher->stopAsync(); } else { - mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); + float threshold = -1.0f; // always finish fetching by default + if (timeUs >= 0ll) { + // seeking, no need to finish fetching + threshold = 0.0f; + } else if (delayRemoval) { + // adapting, abort if remaining of current segment is over threshold + threshold = getAbortThreshold( + mOrigBandwidthIndex, mCurBandwidthIndex); + } + + ALOGV("Pausing with threshold %.3f", threshold); + + fetcher->pauseAsync(threshold); } } sp<AMessage> msg; if (timeUs < 0ll) { // skip onChangeConfiguration2 (decoder destruction) if not seeking. - msg = new AMessage(kWhatChangeConfiguration3, id()); + msg = new AMessage(kWhatChangeConfiguration3, this); } else { - msg = new AMessage(kWhatChangeConfiguration2, id()); + msg = new AMessage(kWhatChangeConfiguration2, this); } msg->setInt32("streamMask", streamMask); msg->setInt32("resumeMask", resumeMask); @@ -1296,23 +1522,14 @@ void LiveSession::changeConfiguration( if (mContinuationCounter == 0) { msg->post(); - - if (mSeekReplyID != 0) { - CHECK(mSeekReply != NULL); - mSeekReply->setInt32("err", OK); - mSeekReply->postReply(mSeekReplyID); - mSeekReplyID = 0; - mSeekReply.clear(); - } } } void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { if (!mReconfigurationInProgress) { - int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; + int32_t pickTrack = 0; msg->findInt32("pickTrack", &pickTrack); - msg->findInt32("bandwidthIndex", &bandwidthIndex); - changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); + changeConfiguration(-1ll /* timeUs */, -1, pickTrack); } else { msg->post(1000000ll); // retry in 1 sec } @@ -1323,13 +1540,42 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { // All fetchers are either suspended or have been removed now. + // If we're seeking, clear all packet sources before we report + // seek complete, to prevent decoder from pulling stale data. + int64_t timeUs; + CHECK(msg->findInt64("timeUs", &timeUs)); + + if (timeUs >= 0) { + mLastSeekTimeUs = timeUs; + + for (size_t i = 0; i < mPacketSources.size(); i++) { + mPacketSources.editValueAt(i)->clear(); + } + + for (size_t i = 0; i < kMaxStreams; ++i) { + mStreams[i].mCurDiscontinuitySeq = 0; + } + + mDiscontinuityOffsetTimesUs.clear(); + mDiscontinuityAbsStartTimesUs.clear(); + + if (mSeekReplyID != NULL) { + CHECK(mSeekReply != NULL); + mSeekReply->setInt32("err", OK); + mSeekReply->postReply(mSeekReplyID); + mSeekReplyID.clear(); + mSeekReply.clear(); + } + + // restart buffer polling after seek becauese previous + // buffering position is no longer valid. + restartPollBuffering(); + } + uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); - // currently onChangeConfiguration2 is only called for seeking; - // remove the following CHECK if using it else where. - CHECK_EQ(resumeMask, 0); streamMask |= resumeMask; AString URIs[kMaxStreams]; @@ -1341,17 +1587,25 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { } } - // Determine which decoders to shutdown on the player side, - // a decoder has to be shutdown if either - // 1) its streamtype was active before but now longer isn't. - // or - // 2) its streamtype was already active and still is but the URI - // has changed. uint32_t changedMask = 0; for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { - if (((mStreamMask & streamMask & indexToType(i)) - && !(URIs[i] == mStreams[i].mUri)) - || (mStreamMask & ~streamMask & indexToType(i))) { + // stream URI could change even if onChangeConfiguration2 is only + // used for seek. Seek could happen during a bw switch, in this + // case bw switch will be cancelled, but the seekTo position will + // fetch from the new URI. + if ((mStreamMask & streamMask & indexToType(i)) + && !mStreams[i].mUri.empty() + && !(URIs[i] == mStreams[i].mUri)) { + ALOGV("stream %zu changed: oldURI %s, newURI %s", i, + mStreams[i].mUri.c_str(), URIs[i].c_str()); + sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); + source->queueDiscontinuity( + ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); + } + // Determine which decoders to shutdown on the player side, + // a decoder has to be shutdown if its streamtype was active + // before but now longer isn't. + if ((mStreamMask & ~streamMask & indexToType(i))) { changedMask |= indexToType(i); } } @@ -1372,7 +1626,7 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { notify->setInt32("changedMask", changedMask); msg->setWhat(kWhatChangeConfiguration3); - msg->setTarget(id()); + msg->setTarget(this); notify->setMessage("reply", msg); notify->post(); @@ -1387,6 +1641,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); + mNewStreamMask = streamMask | resumeMask; + int64_t timeUs; int32_t pickTrack; bool switching = false; @@ -1395,7 +1651,19 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { if (timeUs < 0ll) { if (!pickTrack) { - switching = true; + // mSwapMask contains streams that are in both old and new variant, + // (in mNewStreamMask & mStreamMask) but with different URIs + // (not in resumeMask). + // For example, old variant has video and audio in two separate + // URIs, and new variant has only audio with unchanged URI. mSwapMask + // should be 0 as there is nothing to swap. We only need to stop video, + // and resume audio. + mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; + switching = (mSwapMask != 0); + if (!switching) { + ALOGV("#### Finishing Bandwidth Switch Early: %zd => %zd", + mOrigBandwidthIndex, mCurBandwidthIndex); + } } mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; } else { @@ -1412,47 +1680,18 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { } } - mNewStreamMask = streamMask | resumeMask; - if (switching) { - mSwapMask = mStreamMask & ~resumeMask; - } - // Of all existing fetchers: // * Resume fetchers that are still needed and assign them original packet sources. // * Mark otherwise unneeded fetchers for removal. ALOGV("resuming fetchers for mask 0x%08x", resumeMask); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { const AString &uri = mFetcherInfos.keyAt(i); - - sp<AnotherPacketSource> sources[kMaxStreams]; - for (size_t j = 0; j < kMaxStreams; ++j) { - if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { - sources[j] = mPacketSources.valueFor(indexToType(j)); - - if (j != kSubtitleIndex) { - ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); - sp<AnotherPacketSource> discontinuityQueue; - discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); - discontinuityQueue->queueDiscontinuity( - ATSParser::DISCONTINUITY_NONE, - NULL, - true); - } - } - } - - FetcherInfo &info = mFetcherInfos.editValueAt(i); - if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL - || sources[kSubtitleIndex] != NULL) { - info.mFetcher->startAsync( - sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); - } else { - info.mToBeRemoved = true; + if (!resumeFetcher(uri, resumeMask, timeUs)) { + mFetcherInfos.editValueAt(i).mToBeRemoved = true; } } // streamMask now only contains the types that need a new fetcher created. - if (streamMask != 0) { ALOGV("creating new fetchers for mask 0x%08x", streamMask); } @@ -1470,13 +1709,12 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); CHECK(fetcher != NULL); - int64_t startTimeUs = -1; - int64_t segmentStartTimeUs = -1ll; - int32_t discontinuitySeq = -1; + HLSTime startTime; + SeekMode seekMode = kSeekModeExactPosition; sp<AnotherPacketSource> sources[kMaxStreams]; - if (i == kSubtitleIndex) { - segmentStartTimeUs = latestMediaSegmentStartTimeUs(); + if (i == kSubtitleIndex || (!pickTrack && !switching)) { + startTime = latestMediaSegmentStartTime(); } // TRICKY: looping from i as earlier streams are already removed from streamMask @@ -1486,63 +1724,50 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (timeUs >= 0) { - sources[j]->clear(); - startTimeUs = timeUs; - - sp<AnotherPacketSource> discontinuityQueue; - sp<AMessage> extra = new AMessage; - extra->setInt64("timeUs", timeUs); - discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); - discontinuityQueue->queueDiscontinuity( - ATSParser::DISCONTINUITY_TIME, extra, true); + startTime.mTimeUs = timeUs; } else { int32_t type; sp<AMessage> meta; - if (pickTrack) { - // selecting + if (!switching) { + // selecting, or adapting but no swap required meta = sources[j]->getLatestDequeuedMeta(); } else { - // adapting + // adapting and swap required meta = sources[j]->getLatestEnqueuedMeta(); - } - - if (meta != NULL && !meta->findInt32("discontinuity", &type)) { - int64_t tmpUs; - int64_t tmpSegmentUs; - - CHECK(meta->findInt64("timeUs", &tmpUs)); - CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs)); - if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) { - startTimeUs = tmpUs; - segmentStartTimeUs = tmpSegmentUs; - } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) { - startTimeUs = tmpUs; + if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { + // switching up + meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); } + } - int32_t seq; - CHECK(meta->findInt32("discontinuitySeq", &seq)); - if (discontinuitySeq < 0 || seq < discontinuitySeq) { - discontinuitySeq = seq; + if (j != kSubtitleIndex && meta != NULL + && !meta->findInt32("discontinuity", &type)) { + HLSTime tmpTime(meta); + if (startTime < tmpTime) { + startTime = tmpTime; } } - if (pickTrack) { - // selecting track, queue discontinuities before content + if (!switching) { + // selecting, or adapting but no swap required sources[j]->clear(); if (j == kSubtitleIndex) { break; } - sp<AnotherPacketSource> discontinuityQueue; - discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); - discontinuityQueue->queueDiscontinuity( - ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); + + ALOGV("stream[%zu]: queue format change", j); + sources[j]->queueDiscontinuity( + ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); } else { - // adapting, queue discontinuities after resume + // switching, queue discontinuities after resume sources[j] = mPacketSources2.valueFor(indexToType(j)); sources[j]->clear(); - uint32_t extraStreams = mNewStreamMask & (~mStreamMask); - if (extraStreams & indexToType(j)) { - sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); + // the new fetcher might be providing streams that used to be + // provided by two different fetchers, if one of the fetcher + // paused in the middle while the other somehow paused in next + // seg, we have to start from next seg. + if (seekMode < mStreams[j].mSeekMode) { + seekMode = mStreams[j].mSeekMode; } } } @@ -1551,54 +1776,89 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { } } + // Set the target segment start time to the middle point of the + // segment where the last sample was. + // This gives a better guess if segments of the two variants are not + // perfectly aligned. (If the corresponding segment in new variant + // starts slightly later than that in the old variant, we still want + // to pick that segment, not the one before) fetcher->startAsync( sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], - startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, - segmentStartTimeUs, - discontinuitySeq, - switching); + startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, + startTime.getSegmentTimeUs(true /* midpoint */), + startTime.mSeq, + seekMode); } // All fetchers have now been started, the configuration change // has completed. - cancelCheckBandwidthEvent(); - scheduleCheckBandwidthEvent(); - ALOGV("XXX configuration change completed."); mReconfigurationInProgress = false; if (switching) { mSwitchInProgress = true; } else { mStreamMask = mNewStreamMask; + mOrigBandwidthIndex = mCurBandwidthIndex; } - if (mDisconnectReplyID != 0) { + if (mDisconnectReplyID != NULL) { finishDisconnect(); } } -void LiveSession::onSwapped(const sp<AMessage> &msg) { - int32_t switchGeneration; - CHECK(msg->findInt32("switchGeneration", &switchGeneration)); - if (switchGeneration != mSwitchGeneration) { +void LiveSession::swapPacketSource(StreamType stream) { + ALOGV("swapPacketSource: stream = %d", stream); + + // transfer packets from source2 to source + sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); + sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); + + // queue discontinuity in mPacketSource + aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); + + // queue packets in mPacketSource2 to mPacketSource + status_t finalResult = OK; + sp<ABuffer> accessUnit; + while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && + OK == aps2->dequeueAccessUnit(&accessUnit)) { + aps->queueAccessUnit(accessUnit); + } + aps2->clear(); +} + +void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { + if (!mSwitchInProgress) { + return; + } + + ssize_t index = mFetcherInfos.indexOfKey(oldUri); + if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { return; } - int32_t stream; - CHECK(msg->findInt32("stream", &stream)); + // Swap packet source of streams provided by old variant + for (size_t idx = 0; idx < kMaxStreams; idx++) { + StreamType stream = indexToType(idx); + if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { + swapPacketSource(stream); - ssize_t idx = typeToIndex(stream); - CHECK(idx >= 0); - if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { - ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); + if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { + ALOGW("swapping stream type %d %s to empty stream", + stream, mStreams[idx].mUri.c_str()); + } + mStreams[idx].mUri = mStreams[idx].mNewUri; + mStreams[idx].mNewUri.clear(); + + mSwapMask &= ~stream; + } } - mStreams[idx].mUri = mStreams[idx].mNewUri; - mStreams[idx].mNewUri.clear(); - mSwapMask &= ~stream; + mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); + + ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask); if (mSwapMask != 0) { return; } @@ -1606,155 +1866,308 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) { // Check if new variant contains extra streams. uint32_t extraStreams = mNewStreamMask & (~mStreamMask); while (extraStreams) { - StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); - swapPacketSource(extraStream); - extraStreams &= ~extraStream; + StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); + extraStreams &= ~stream; - idx = typeToIndex(extraStream); + swapPacketSource(stream); + + ssize_t idx = typeToIndex(stream); CHECK(idx >= 0); if (mStreams[idx].mNewUri.empty()) { ALOGW("swapping extra stream type %d %s to empty stream", - extraStream, mStreams[idx].mUri.c_str()); + stream, mStreams[idx].mUri.c_str()); } mStreams[idx].mUri = mStreams[idx].mNewUri; mStreams[idx].mNewUri.clear(); } - tryToFinishBandwidthSwitch(); -} - -void LiveSession::onCheckSwitchDown() { - if (mSwitchDownMonitor == NULL) { - return; + // Restart new fetcher (it was paused after the first 47k block) + // and let it fetch into mPacketSources (not mPacketSources2) + for (size_t i = 0; i < mFetcherInfos.size(); ++i) { + FetcherInfo &info = mFetcherInfos.editValueAt(i); + if (info.mToBeResumed) { + resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); + info.mToBeResumed = false; + } } - if (mSwitchInProgress || mReconfigurationInProgress) { - ALOGV("Switch/Reconfig in progress, defer switch down"); - mSwitchDownMonitor->post(1000000ll); - return; - } + ALOGI("#### Finished Bandwidth Switch: %zd => %zd", + mOrigBandwidthIndex, mCurBandwidthIndex); - for (size_t i = 0; i < kMaxStreams; ++i) { - int32_t targetDuration; - sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); - sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); + mStreamMask = mNewStreamMask; + mSwitchInProgress = false; + mOrigBandwidthIndex = mCurBandwidthIndex; - if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { - int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); - int64_t targetDurationUs = targetDuration * 1000000ll; + restartPollBuffering(); +} - if (bufferedDurationUs < targetDurationUs / 3) { - (new AMessage(kWhatSwitchDown, id()))->post(); - break; - } - } - } +void LiveSession::schedulePollBuffering() { + sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); + msg->setInt32("generation", mPollBufferingGeneration); + msg->post(1000000ll); +} - mSwitchDownMonitor->post(1000000ll); +void LiveSession::cancelPollBuffering() { + ++mPollBufferingGeneration; + mPrevBufferPercentage = -1; } -void LiveSession::onSwitchDown() { - if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { - return; - } +void LiveSession::restartPollBuffering() { + cancelPollBuffering(); + onPollBuffering(); +} + +void LiveSession::onPollBuffering() { + ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " + "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", + mSwitchInProgress, mReconfigurationInProgress, + mInPreparationPhase, mCurBandwidthIndex, mStreamMask); + + bool underflow, ready, down, up; + if (checkBuffering(underflow, ready, down, up)) { + if (mInPreparationPhase && ready) { + postPrepared(OK); + } + + // don't switch before we report prepared + if (!mInPreparationPhase) { + if (ready) { + stopBufferingIfNecessary(); + } else if (underflow) { + startBufferingIfNecessary(); + } + switchBandwidthIfNeeded(up, down); + } - ssize_t bandwidthIndex = getBandwidthIndex(); - if (bandwidthIndex < mCurBandwidthIndex) { - changeConfiguration(-1, bandwidthIndex, false); - return; } + schedulePollBuffering(); } -// Mark switch done when: -// 1. all old buffers are swapped out -void LiveSession::tryToFinishBandwidthSwitch() { +void LiveSession::cancelBandwidthSwitch(bool resume) { + ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", + mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); if (!mSwitchInProgress) { return; } - bool needToRemoveFetchers = false; for (size_t i = 0; i < mFetcherInfos.size(); ++i) { - if (mFetcherInfos.valueAt(i).mToBeRemoved) { - needToRemoveFetchers = true; - break; + FetcherInfo& info = mFetcherInfos.editValueAt(i); + if (info.mToBeRemoved) { + info.mToBeRemoved = false; + if (resume) { + resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); + } } } - if (!needToRemoveFetchers && mSwapMask == 0) { - ALOGI("mSwitchInProgress = false"); - mStreamMask = mNewStreamMask; - mSwitchInProgress = false; + for (size_t i = 0; i < kMaxStreams; ++i) { + AString newUri = mStreams[i].mNewUri; + if (!newUri.empty()) { + // clear all mNewUri matching this newUri + for (size_t j = i; j < kMaxStreams; ++j) { + if (mStreams[j].mNewUri == newUri) { + mStreams[j].mNewUri.clear(); + } + } + ALOGV("stopping newUri = %s", newUri.c_str()); + ssize_t index = mFetcherInfos.indexOfKey(newUri); + if (index < 0) { + ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); + continue; + } + FetcherInfo &info = mFetcherInfos.editValueAt(index); + info.mToBeRemoved = true; + info.mFetcher->stopAsync(); + } } -} -void LiveSession::scheduleCheckBandwidthEvent() { - sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); - msg->setInt32("generation", mCheckBandwidthGeneration); - msg->post(10000000ll); -} - -void LiveSession::cancelCheckBandwidthEvent() { - ++mCheckBandwidthGeneration; -} + ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", + mCurBandwidthIndex, mOrigBandwidthIndex); -void LiveSession::cancelBandwidthSwitch() { - Mutex::Autolock lock(mSwapMutex); mSwitchGeneration++; mSwitchInProgress = false; + mCurBandwidthIndex = mOrigBandwidthIndex; mSwapMask = 0; +} - for (size_t i = 0; i < mFetcherInfos.size(); ++i) { - FetcherInfo& info = mFetcherInfos.editValueAt(i); - if (info.mToBeRemoved) { - info.mToBeRemoved = false; - } +bool LiveSession::checkBuffering( + bool &underflow, bool &ready, bool &down, bool &up) { + underflow = ready = down = up = false; + + if (mReconfigurationInProgress) { + ALOGV("Switch/Reconfig in progress, defer buffer polling"); + return false; } - for (size_t i = 0; i < kMaxStreams; ++i) { - if (!mStreams[i].mNewUri.empty()) { - ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); - if (j < 0) { - mStreams[i].mNewUri.clear(); - continue; + size_t activeCount, underflowCount, readyCount, downCount, upCount; + activeCount = underflowCount = readyCount = downCount = upCount =0; + int32_t minBufferPercent = -1; + int64_t durationUs; + if (getDuration(&durationUs) != OK) { + durationUs = -1; + } + for (size_t i = 0; i < mPacketSources.size(); ++i) { + // we don't check subtitles for buffering level + if (!(mStreamMask & mPacketSources.keyAt(i) + & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { + continue; + } + // ignore streams that never had any packet queued. + // (it's possible that the variant only has audio or video) + sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); + if (meta == NULL) { + continue; + } + + int64_t bufferedDurationUs = + mPacketSources[i]->getEstimatedDurationUs(); + ALOGV("source[%zu]: buffered %lld us", i, (long long)bufferedDurationUs); + if (durationUs >= 0) { + int32_t percent; + if (mPacketSources[i]->isFinished(0 /* duration */)) { + percent = 100; + } else { + percent = (int32_t)(100.0 * (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); + } + if (minBufferPercent < 0 || percent < minBufferPercent) { + minBufferPercent = percent; } + } - const FetcherInfo &info = mFetcherInfos.valueAt(j); - info.mFetcher->stopAsync(); - mFetcherInfos.removeItemsAt(j); - mStreams[i].mNewUri.clear(); + ++activeCount; + int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs; + if (bufferedDurationUs > readyMark + || mPacketSources[i]->isFinished(0)) { + ++readyCount; + } + if (!mPacketSources[i]->isFinished(0)) { + if (bufferedDurationUs < kUnderflowMarkUs) { + ++underflowCount; + } + if (bufferedDurationUs > mUpSwitchMark) { + ++upCount; + } + if (bufferedDurationUs < mDownSwitchMark) { + ++downCount; + } } } -} -bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { - if (mReconfigurationInProgress || mSwitchInProgress) { - return false; + if (minBufferPercent >= 0) { + notifyBufferingUpdate(minBufferPercent); } - if (mCurBandwidthIndex < 0) { + if (activeCount > 0) { + up = (upCount == activeCount); + down = (downCount > 0); + ready = (readyCount == activeCount); + underflow = (underflowCount > 0); return true; } - if (bandwidthIndex == (size_t)mCurBandwidthIndex) { - return false; - } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { - return canSwitchUp(); - } else { - return true; + return false; +} + +void LiveSession::startBufferingIfNecessary() { + ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", + mInPreparationPhase, mBuffering); + if (!mBuffering) { + mBuffering = true; + + sp<AMessage> notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingStart); + notify->post(); } } -void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { - size_t bandwidthIndex = getBandwidthIndex(); - if (canSwitchBandwidthTo(bandwidthIndex)) { - changeConfiguration(-1ll /* timeUs */, bandwidthIndex); +void LiveSession::stopBufferingIfNecessary() { + ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", + mInPreparationPhase, mBuffering); + + if (mBuffering) { + mBuffering = false; + + sp<AMessage> notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingEnd); + notify->post(); + } +} + +void LiveSession::notifyBufferingUpdate(int32_t percentage) { + if (percentage < mPrevBufferPercentage) { + percentage = mPrevBufferPercentage; + } else if (percentage > 100) { + percentage = 100; + } + + mPrevBufferPercentage = percentage; + + ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); + + sp<AMessage> notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingUpdate); + notify->setInt32("percentage", percentage); + notify->post(); +} + +void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { + // no need to check bandwidth if we only have 1 bandwidth settings + if (mSwitchInProgress || mBandwidthItems.size() < 2) { + return; + } + + int32_t bandwidthBps; + if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) { + ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); + mLastBandwidthBps = bandwidthBps; } else { - // Come back and check again 10 seconds later in case there is nothing to do now. - // If we DO change configuration, once that completes it'll schedule a new - // check bandwidth event with an incremented mCheckBandwidthGeneration. - msg->post(10000000ll); + ALOGV("no bandwidth estimate."); + return; } + + int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; + // canSwithDown and canSwitchUp can't both be true. + // we only want to switch up when measured bw is 120% higher than current variant, + // and we only want to switch down when measured bw is below current variant. + bool canSwithDown = bufferLow + && (bandwidthBps < (int32_t)curBandwidth); + bool canSwitchUp = bufferHigh + && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); + + if (canSwithDown || canSwitchUp) { + ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); + + // it's possible that we're checking for canSwitchUp case, but the returned + // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% + // of measured bw. In that case we don't want to do anything, since we have + // both enough buffer and enough bw. + if (bandwidthIndex == mCurBandwidthIndex + || (canSwitchUp && bandwidthIndex < mCurBandwidthIndex) + || (canSwithDown && bandwidthIndex > mCurBandwidthIndex)) { + return; + } + + ALOGI("#### Starting Bandwidth Switch: %zd => %zd", + mCurBandwidthIndex, bandwidthIndex); + changeConfiguration(-1, bandwidthIndex, false); + } +} + +void LiveSession::postError(status_t err) { + // if we reached EOS, notify buffering of 100% + if (err == ERROR_END_OF_STREAM) { + notifyBufferingUpdate(100); + } + // we'll stop buffer polling now, before that notify + // stop buffering to stop the spinning icon + stopBufferingIfNecessary(); + cancelPollBuffering(); + + sp<AMessage> notify = mNotify->dup(); + notify->setInt32("what", kWhatError); + notify->setInt32("err", err); + notify->post(); } void LiveSession::postPrepared(status_t err) { @@ -1764,6 +2177,8 @@ void LiveSession::postPrepared(status_t err) { if (err == OK || err == ERROR_END_OF_STREAM) { notify->setInt32("what", kWhatPrepared); } else { + cancelPollBuffering(); + notify->setInt32("what", kWhatPreparationFailed); notify->setInt32("err", err); } @@ -1771,10 +2186,8 @@ void LiveSession::postPrepared(status_t err) { notify->post(); mInPreparationPhase = false; - - mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); - mSwitchDownMonitor->post(); } + } // namespace android |