/* * Copyright (C) 2010 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "LiveSession" #include #include "LiveSession.h" #include "HTTPDownloader.h" #include "M3UParser.h" #include "PlaylistFetcher.h" #include "mpeg2ts/AnotherPacketSource.h" #include #include #include #include #include #include #include #include #include #include #include #include namespace android { // static // Bandwidth Switch Mark Defaults const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; const int64_t LiveSession::kResumeThresholdUs = 100000ll; // Buffer Prepare/Ready/Underflow Marks const int64_t LiveSession::kReadyMarkUs = 5000000ll; const int64_t LiveSession::kPrepareMarkUs = 1500000ll; const int64_t LiveSession::kUnderflowMarkUs = 1000000ll; struct LiveSession::BandwidthEstimator : public LiveSession::BandwidthBaseEstimator { BandwidthEstimator(); void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); bool estimateBandwidth( int32_t *bandwidth, bool *isStable = NULL, int32_t *shortTermBps = NULL); private: // Bandwidth estimation parameters static const int32_t kShortTermBandwidthItems = 3; static const int32_t kMinBandwidthHistoryItems = 20; static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec struct BandwidthEntry { int64_t mTimestampUs; int64_t mDelayUs; size_t mNumBytes; }; Mutex mLock; List mBandwidthHistory; List mPrevEstimates; int32_t mShortTermEstimate; bool mHasNewSample; bool mIsStable; int64_t mTotalTransferTimeUs; size_t mTotalTransferBytes; DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); }; LiveSession::BandwidthEstimator::BandwidthEstimator() : mShortTermEstimate(0), mHasNewSample(false), mIsStable(true), mTotalTransferTimeUs(0), mTotalTransferBytes(0) { } void LiveSession::BandwidthEstimator::addBandwidthMeasurement( size_t numBytes, int64_t delayUs) { AutoMutex autoLock(mLock); int64_t nowUs = ALooper::GetNowUs(); BandwidthEntry entry; entry.mTimestampUs = nowUs; entry.mDelayUs = delayUs; entry.mNumBytes = numBytes; mTotalTransferTimeUs += delayUs; mTotalTransferBytes += numBytes; mBandwidthHistory.push_back(entry); mHasNewSample = true; // Remove no more than 10% of total transfer time at a time // to avoid sudden jump on bandwidth estimation. There might // be long blocking reads that takes up signification time, // we have to keep a longer window in that case. int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10; if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) { bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs; } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) { bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs; } // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, // and total transfer time at least kMaxBandwidthHistoryWindowUs. while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) { List::iterator it = mBandwidthHistory.begin(); // remove sample if either absolute age or total transfer time is // over kMaxBandwidthHistoryWindowUs if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs && mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) { break; } mTotalTransferTimeUs -= it->mDelayUs; mTotalTransferBytes -= it->mNumBytes; mBandwidthHistory.erase(mBandwidthHistory.begin()); } } bool LiveSession::BandwidthEstimator::estimateBandwidth( int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) { AutoMutex autoLock(mLock); if (mBandwidthHistory.size() < 2) { return false; } if (!mHasNewSample) { *bandwidthBps = *(--mPrevEstimates.end()); if (isStable) { *isStable = mIsStable; } if (shortTermBps) { *shortTermBps = mShortTermEstimate; } return true; } *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); mPrevEstimates.push_back(*bandwidthBps); while (mPrevEstimates.size() > 3) { mPrevEstimates.erase(mPrevEstimates.begin()); } mHasNewSample = false; int64_t totalTimeUs = 0; size_t totalBytes = 0; if (mBandwidthHistory.size() >= kShortTermBandwidthItems) { List::iterator it = --mBandwidthHistory.end(); for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) { totalTimeUs += it->mDelayUs; totalBytes += it->mNumBytes; } } mShortTermEstimate = totalTimeUs > 0 ? (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps; if (shortTermBps) { *shortTermBps = mShortTermEstimate; } int32_t minEstimate = -1, maxEstimate = -1; List::iterator it; for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) { int32_t estimate = *it; if (minEstimate < 0 || minEstimate > estimate) { minEstimate = estimate; } if (maxEstimate < 0 || maxEstimate < estimate) { maxEstimate = estimate; } } // consider it stable if long-term average is not jumping a lot // and short-term average is not much lower than long-term average mIsStable = (maxEstimate <= minEstimate * 4 / 3) && mShortTermEstimate > minEstimate * 7 / 10; if (isStable) { *isStable = mIsStable; } #if 0 { char dumpStr[1024] = {0}; size_t itemIdx = 0; size_t histSize = mBandwidthHistory.size(); sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {", *bandwidthBps, mIsStable, histSize); List::iterator it = mBandwidthHistory.begin(); for (; it != mBandwidthHistory.end(); ++it) { if (itemIdx > 50) { sprintf(dumpStr + strlen(dumpStr), "...(%zd more items)... }", histSize - itemIdx); break; } sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s", it->mNumBytes / 1024, (double)it->mDelayUs * 1.0e-6, (it == (--mBandwidthHistory.end())) ? "}" : ", "); itemIdx++; } ALOGE(dumpStr); } #endif return true; } //static const char *LiveSession::getKeyForStream(StreamType type) { switch (type) { case STREAMTYPE_VIDEO: return "timeUsVideo"; case STREAMTYPE_AUDIO: return "timeUsAudio"; case STREAMTYPE_SUBTITLES: return "timeUsSubtitle"; case STREAMTYPE_METADATA: return "timeUsMetadata"; // unused default: TRESPASS(); } return NULL; } //static const char *LiveSession::getNameForStream(StreamType type) { switch (type) { case STREAMTYPE_VIDEO: return "video"; case STREAMTYPE_AUDIO: return "audio"; case STREAMTYPE_SUBTITLES: return "subs"; case STREAMTYPE_METADATA: return "metadata"; default: break; } return "unknown"; } //static ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) { switch (type) { case STREAMTYPE_VIDEO: return ATSParser::VIDEO; case STREAMTYPE_AUDIO: return ATSParser::AUDIO; case STREAMTYPE_METADATA: return ATSParser::META; case STREAMTYPE_SUBTITLES: default: TRESPASS(); } return ATSParser::NUM_SOURCE_TYPES; // should not reach here } LiveSession::LiveSession( const sp ¬ify, uint32_t flags, const sp &httpService) : mNotify(notify), mFlags(flags), mHTTPService(httpService), mBuffering(false), mInPreparationPhase(true), mPollBufferingGeneration(0), mPrevBufferPercentage(-1), mCurBandwidthIndex(-1), mOrigBandwidthIndex(-1), mLastBandwidthBps(-1ll), mLastBandwidthStable(false), mBandwidthEstimator(new BandwidthEstimator()), mMaxWidth(720), mMaxHeight(480), mStreamMask(0), mNewStreamMask(0), mSwapMask(0), mSwitchGeneration(0), mSubtitleGeneration(0), mLastDequeuedTimeUs(0ll), mRealTimeBaseUs(0ll), mReconfigurationInProgress(false), mSwitchInProgress(false), mUpSwitchMark(kUpSwitchMarkUs), mDownSwitchMark(kDownSwitchMarkUs), mUpSwitchMargin(kUpSwitchMarginUs), mFirstTimeUsValid(false), mFirstTimeUs(0), mLastSeekTimeUs(0), mHasMetadata(false) { mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); for (size_t i = 0; i < kNumSources; ++i) { mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); } } LiveSession::~LiveSession() { if (mFetcherLooper != NULL) { mFetcherLooper->stop(); } } int64_t LiveSession::calculateMediaTimeUs( int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) { if (timeUs >= firstTimeUs) { timeUs -= firstTimeUs; } else { timeUs = 0; } timeUs += mLastSeekTimeUs; if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); } return timeUs; } status_t LiveSession::dequeueAccessUnit( StreamType stream, sp *accessUnit) { status_t finalResult = OK; sp packetSource = mPacketSources.valueFor(stream); ssize_t streamIdx = typeToIndex(stream); if (streamIdx < 0) { return BAD_VALUE; } const char *streamStr = getNameForStream(stream); // Do not let client pull data if we don't have data packets yet. // We might only have a format discontinuity queued without data. // When NuPlayerDecoder dequeues the format discontinuity, it will // immediately try to getFormat. If we return NULL, NuPlayerDecoder // thinks it can do seamless change, so will not shutdown decoder. // When the actual format arrives, it can't handle it and get stuck. if (!packetSource->hasDataBufferAvailable(&finalResult)) { ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", streamStr, finalResult); if (finalResult == OK) { return -EAGAIN; } else { return finalResult; } } // Let the client dequeue as long as we have buffers available // Do not make pause/resume decisions here. status_t err = packetSource->dequeueAccessUnit(accessUnit); if (err == INFO_DISCONTINUITY) { // adaptive streaming, discontinuities in the playlist int32_t type; CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); sp extra; if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { extra.clear(); } ALOGI("[%s] read discontinuity of type %d, extra = %s", streamStr, type, extra == NULL ? "NULL" : extra->debugString().c_str()); } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { int64_t timeUs, originalTimeUs; int32_t discontinuitySeq = 0; StreamItem& strm = mStreams[streamIdx]; CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); originalTimeUs = timeUs; (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { int64_t offsetTimeUs; if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); } else { offsetTimeUs = 0; } if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0 && strm.mLastDequeuedTimeUs >= 0) { int64_t firstTimeUs; firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; offsetTimeUs += strm.mLastSampleDurationUs; } else { offsetTimeUs += strm.mLastSampleDurationUs; } mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); strm.mCurDiscontinuitySeq = discontinuitySeq; } int32_t discard = 0; int64_t firstTimeUs; if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { int64_t durUs; // approximate sample duration if (timeUs > strm.mLastDequeuedTimeUs) { durUs = timeUs - strm.mLastDequeuedTimeUs; } else { durUs = strm.mLastDequeuedTimeUs - timeUs; } strm.mLastSampleDurationUs = durUs; firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { firstTimeUs = timeUs; } else { mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); firstTimeUs = timeUs; } strm.mLastDequeuedTimeUs = timeUs; timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq); ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", streamStr, (long long)timeUs, (long long)originalTimeUs); (*accessUnit)->meta()->setInt64("timeUs", timeUs); mLastDequeuedTimeUs = timeUs; mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; } else if (stream == STREAMTYPE_SUBTITLES) { int32_t subtitleGeneration; if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) && subtitleGeneration != mSubtitleGeneration) { return -EAGAIN; }; (*accessUnit)->meta()->setInt32( "trackIndex", mPlaylist->getSelectedIndex()); (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); } else if (stream == STREAMTYPE_METADATA) { HLSTime mdTime((*accessUnit)->meta()); if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) { packetSource->requeueAccessUnit((*accessUnit)); return -EAGAIN; } else { int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq); int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq); (*accessUnit)->meta()->setInt64("timeUs", timeUs); (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); } } } else { ALOGI("[%s] encountered error %d", streamStr, err); } return err; } status_t LiveSession::getStreamFormat(StreamType stream, sp *format) { if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; } sp packetSource = mPacketSources.valueFor(stream); sp meta = packetSource->getFormat(); if (meta == NULL) { return -EAGAIN; } if (stream == STREAMTYPE_AUDIO) { // set AAC input buffer size to 32K bytes (256kbps x 1sec) meta->setInt32(kKeyMaxInputSize, 32 * 1024); } else if (stream == STREAMTYPE_VIDEO) { meta->setInt32(kKeyMaxWidth, mMaxWidth); meta->setInt32(kKeyMaxHeight, mMaxHeight); } return convertMetaDataToMessage(meta, format); } sp LiveSession::getHTTPDownloader() { return new HTTPDownloader(mHTTPService, mExtraHeaders); } void LiveSession::connectAsync( const char *url, const KeyedVector *headers) { sp msg = new AMessage(kWhatConnect, this); msg->setString("url", url); if (headers != NULL) { msg->setPointer( "headers", new KeyedVector(*headers)); } msg->post(); } status_t LiveSession::disconnect() { sp msg = new AMessage(kWhatDisconnect, this); sp response; status_t err = msg->postAndAwaitResponse(&response); return err; } status_t LiveSession::seekTo(int64_t timeUs) { sp msg = new AMessage(kWhatSeek, this); msg->setInt64("timeUs", timeUs); sp response; status_t err = msg->postAndAwaitResponse(&response); return err; } bool LiveSession::checkSwitchProgress( sp &stopParams, int64_t delayUs, bool *needResumeUntil) { AString newUri; CHECK(stopParams->findString("uri", &newUri)); *needResumeUntil = false; sp firstNewMeta[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { StreamType stream = indexToType(i); if (!(mSwapMask & mNewStreamMask & stream) || (mStreams[i].mNewUri != newUri)) { continue; } if (stream == STREAMTYPE_SUBTITLES) { continue; } sp &source = mPacketSources.editValueAt(i); // First, get latest dequeued meta, which is where the decoder is at. // (when upswitching, we take the meta after a certain delay, so that // the decoder is left with some cushion) sp lastDequeueMeta, lastEnqueueMeta; if (delayUs > 0) { lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); if (lastDequeueMeta == NULL) { // this means we don't have enough cushion, try again later ALOGV("[%s] up switching failed due to insufficient buffer", getNameForStream(stream)); return false; } } else { // It's okay for lastDequeueMeta to be NULL here, it means the // decoder hasn't even started dequeueing lastDequeueMeta = source->getLatestDequeuedMeta(); } // Then, trim off packets at beginning of mPacketSources2 that's before // the latest dequeued time. These samples are definitely too late. firstNewMeta[i] = mPacketSources2.editValueAt(i) ->trimBuffersBeforeMeta(lastDequeueMeta); // Now firstNewMeta[i] is the first sample after the trim. // If it's NULL, we failed because dequeue already past all samples // in mPacketSource2, we have to try again. if (firstNewMeta[i] == NULL) { HLSTime dequeueTime(lastDequeueMeta); ALOGV("[%s] dequeue time (%d, %lld) past start time", getNameForStream(stream), dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); return false; } // Otherwise, we check if mPacketSources2 overlaps with what old fetcher // already fetched, and see if we need to resumeUntil lastEnqueueMeta = source->getLatestEnqueuedMeta(); // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity // boundary, no need to resume as the content will look different anyways if (lastEnqueueMeta != NULL) { HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); // no need to resume old fetcher if new fetcher started in different // discontinuity sequence, as the content will look different. *needResumeUntil |= (startTime.mSeq == lastTime.mSeq && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); // update the stopTime for resumeUntil stopParams->setInt32("discontinuitySeq", startTime.mSeq); stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); } } // if we're here, it means dequeue progress hasn't passed some samples in // mPacketSource2, we can trim off the excess in mPacketSource. // (old fetcher might still need to resumeUntil the start time of new fetcher) for (size_t i = 0; i < kMaxStreams; ++i) { StreamType stream = indexToType(i); if (!(mSwapMask & mNewStreamMask & stream) || (newUri != mStreams[i].mNewUri) || stream == STREAMTYPE_SUBTITLES) { continue; } mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); } // no resumeUntil if already underflow *needResumeUntil &= !mBuffering; return true; } void LiveSession::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatConnect: { onConnect(msg); break; } case kWhatDisconnect: { CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); if (mReconfigurationInProgress) { break; } finishDisconnect(); break; } case kWhatSeek: { if (mReconfigurationInProgress) { msg->post(50000); break; } CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); mSeekReply = new AMessage; onSeek(msg); break; } case kWhatFetcherNotify: { int32_t what; CHECK(msg->findInt32("what", &what)); switch (what) { case PlaylistFetcher::kWhatStarted: break; case PlaylistFetcher::kWhatPaused: case PlaylistFetcher::kWhatStopped: { AString uri; CHECK(msg->findString("uri", &uri)); ssize_t index = mFetcherInfos.indexOfKey(uri); if (index < 0) { // ignore msgs from fetchers that's already gone break; } ALOGV("fetcher-%d %s", mFetcherInfos[index].mFetcher->getFetcherID(), what == PlaylistFetcher::kWhatPaused ? "paused" : "stopped"); if (what == PlaylistFetcher::kWhatStopped) { mFetcherLooper->unregisterHandler( mFetcherInfos[index].mFetcher->id()); mFetcherInfos.removeItemsAt(index); } else if (what == PlaylistFetcher::kWhatPaused) { int32_t seekMode; CHECK(msg->findInt32("seekMode", &seekMode)); for (size_t i = 0; i < kMaxStreams; ++i) { if (mStreams[i].mUri == uri) { mStreams[i].mSeekMode = (SeekMode) seekMode; } } } if (mContinuation != NULL) { CHECK_GT(mContinuationCounter, 0); if (--mContinuationCounter == 0) { mContinuation->post(); } ALOGV("%zu fetcher(s) left", mContinuationCounter); } break; } case PlaylistFetcher::kWhatDurationUpdate: { AString uri; CHECK(msg->findString("uri", &uri)); int64_t durationUs; CHECK(msg->findInt64("durationUs", &durationUs)); ssize_t index = mFetcherInfos.indexOfKey(uri); if (index >= 0) { FetcherInfo *info = &mFetcherInfos.editValueFor(uri); info->mDurationUs = durationUs; } break; } case PlaylistFetcher::kWhatTargetDurationUpdate: { int64_t targetDurationUs; CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); break; } case PlaylistFetcher::kWhatError: { status_t err; CHECK(msg->findInt32("err", &err)); ALOGE("XXX Received error %d from PlaylistFetcher.", err); // handle EOS on subtitle tracks independently AString uri; if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { ssize_t i = mFetcherInfos.indexOfKey(uri); if (i >= 0) { const sp &fetcher = mFetcherInfos.valueAt(i).mFetcher; if (fetcher != NULL) { uint32_t type = fetcher->getStreamTypeMask(); if (type == STREAMTYPE_SUBTITLES) { mPacketSources.valueFor( STREAMTYPE_SUBTITLES)->signalEOS(err);; break; } } } } // remember the failure index (as mCurBandwidthIndex will be restored // after cancelBandwidthSwitch()), and record last fail time size_t failureIndex = mCurBandwidthIndex; mBandwidthItems.editItemAt( failureIndex).mLastFailureUs = ALooper::GetNowUs(); if (mSwitchInProgress) { // if error happened when we switch to a variant, try fallback // to other variant to save the session if (tryBandwidthFallback()) { break; } } if (mInPreparationPhase) { postPrepared(err); } cancelBandwidthSwitch(); mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); mPacketSources.valueFor( STREAMTYPE_SUBTITLES)->signalEOS(err); postError(err); break; } case PlaylistFetcher::kWhatStopReached: { ALOGV("kWhatStopReached"); AString oldUri; CHECK(msg->findString("uri", &oldUri)); ssize_t index = mFetcherInfos.indexOfKey(oldUri); if (index < 0) { break; } tryToFinishBandwidthSwitch(oldUri); break; } case PlaylistFetcher::kWhatStartedAt: { int32_t switchGeneration; CHECK(msg->findInt32("switchGeneration", &switchGeneration)); ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", switchGeneration, mSwitchGeneration); if (switchGeneration != mSwitchGeneration) { break; } AString uri; CHECK(msg->findString("uri", &uri)); // mark new fetcher mToBeResumed ssize_t index = mFetcherInfos.indexOfKey(uri); if (index >= 0) { mFetcherInfos.editValueAt(index).mToBeResumed = true; } // temporarily disable packet sources to be swapped to prevent // NuPlayerDecoder from dequeuing while we check progress for (size_t i = 0; i < mPacketSources.size(); ++i) { if ((mSwapMask & mPacketSources.keyAt(i)) && uri == mStreams[i].mNewUri) { mPacketSources.editValueAt(i)->enable(false); } } bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); // If switching up, require a cushion bigger than kUnderflowMark // to avoid buffering immediately after the switch. // (If we don't have that cushion we'd rather cancel and try again.) int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0; bool needResumeUntil = false; sp stopParams = msg; if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { // playback time hasn't passed startAt time if (!needResumeUntil) { ALOGV("finish switch"); for (size_t i = 0; i < kMaxStreams; ++i) { if ((mSwapMask & indexToType(i)) && uri == mStreams[i].mNewUri) { // have to make a copy of mStreams[i].mUri because // tryToFinishBandwidthSwitch is modifying mStreams[] AString oldURI = mStreams[i].mUri; tryToFinishBandwidthSwitch(oldURI); break; } } } else { // startAt time is after last enqueue time // Resume fetcher for the original variant; the resumed fetcher should // continue until the timestamps found in msg, which is stored by the // new fetcher to indicate where the new variant has started buffering. ALOGV("finish switch with resumeUntilAsync"); for (size_t i = 0; i < mFetcherInfos.size(); i++) { const FetcherInfo &info = mFetcherInfos.valueAt(i); if (info.mToBeRemoved) { info.mFetcher->resumeUntilAsync(stopParams); } } } } else { // playback time passed startAt time if (switchUp) { // if switching up, cancel and retry if condition satisfies again ALOGV("cancel up switch because we're too late"); cancelBandwidthSwitch(true /* resume */); } else { ALOGV("retry down switch at next sample"); resumeFetcher(uri, mSwapMask, -1, true /* newUri */); } } // re-enable all packet sources for (size_t i = 0; i < mPacketSources.size(); ++i) { mPacketSources.editValueAt(i)->enable(true); } break; } case PlaylistFetcher::kWhatPlaylistFetched: { onMasterPlaylistFetched(msg); break; } case PlaylistFetcher::kWhatMetadataDetected: { if (!mHasMetadata) { mHasMetadata = true; sp notify = mNotify->dup(); notify->setInt32("what", kWhatMetadataDetected); notify->post(); } break; } default: TRESPASS(); } break; } case kWhatChangeConfiguration: { onChangeConfiguration(msg); break; } case kWhatChangeConfiguration2: { onChangeConfiguration2(msg); break; } case kWhatChangeConfiguration3: { onChangeConfiguration3(msg); break; } case kWhatPollBuffering: { int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation == mPollBufferingGeneration) { onPollBuffering(); } break; } default: TRESPASS(); break; } } // static bool LiveSession::isBandwidthValid(const BandwidthItem &item) { static const int64_t kBlacklistWindowUs = 300 * 1000000ll; return item.mLastFailureUs < 0 || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs; } // static int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { if (a->mBandwidth < b->mBandwidth) { return -1; } else if (a->mBandwidth == b->mBandwidth) { return 0; } return 1; } // static LiveSession::StreamType LiveSession::indexToType(int idx) { CHECK(idx >= 0 && idx < kNumSources); return (StreamType)(1 << idx); } // static ssize_t LiveSession::typeToIndex(int32_t type) { switch (type) { case STREAMTYPE_AUDIO: return 0; case STREAMTYPE_VIDEO: return 1; case STREAMTYPE_SUBTITLES: return 2; case STREAMTYPE_METADATA: return 3; default: return -1; }; return -1; } void LiveSession::onConnect(const sp &msg) { CHECK(msg->findString("url", &mMasterURL)); // TODO currently we don't know if we are coming here from incognito mode ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str()); KeyedVector *headers = NULL; if (!msg->findPointer("headers", (void **)&headers)) { mExtraHeaders.clear(); } else { mExtraHeaders = *headers; delete headers; headers = NULL; } // create looper for fetchers if (mFetcherLooper == NULL) { mFetcherLooper = new ALooper(); mFetcherLooper->setName("Fetcher"); mFetcherLooper->start(false, false); } // create fetcher to fetch the master playlist addFetcher(mMasterURL.c_str())->fetchPlaylistAsync(); } void LiveSession::onMasterPlaylistFetched(const sp &msg) { AString uri; CHECK(msg->findString("uri", &uri)); ssize_t index = mFetcherInfos.indexOfKey(uri); if (index < 0) { ALOGW("fetcher for master playlist is gone."); return; } // no longer useful, remove mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id()); mFetcherInfos.removeItemsAt(index); CHECK(msg->findObject("playlist", (sp *)&mPlaylist)); if (mPlaylist == NULL) { ALOGE("unable to fetch master playlist %s.", uriDebugString(mMasterURL).c_str()); postPrepared(ERROR_IO); return; } // We trust the content provider to make a reasonable choice of preferred // initial bandwidth by listing it first in the variant playlist. // At startup we really don't have a good estimate on the available // network bandwidth since we haven't tranferred any data yet. Once // we have we can make a better informed choice. size_t initialBandwidth = 0; size_t initialBandwidthIndex = 0; int32_t maxWidth = 0; int32_t maxHeight = 0; if (mPlaylist->isVariantPlaylist()) { Vector itemsWithVideo; for (size_t i = 0; i < mPlaylist->size(); ++i) { BandwidthItem item; item.mPlaylistIndex = i; item.mLastFailureUs = -1ll; sp meta; AString uri; mPlaylist->itemAt(i, &uri, &meta); CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); int32_t width, height; if (meta->findInt32("width", &width)) { maxWidth = max(maxWidth, width); } if (meta->findInt32("height", &height)) { maxHeight = max(maxHeight, height); } mBandwidthItems.push(item); if (mPlaylist->hasType(i, "video")) { itemsWithVideo.push(item); } } #if 0 // remove the audio-only variants if we have at least one with video if (!itemsWithVideo.empty() && itemsWithVideo.size() < mBandwidthItems.size()) { mBandwidthItems.clear(); for (size_t i = 0; i < itemsWithVideo.size(); ++i) { mBandwidthItems.push(itemsWithVideo[i]); } } #endif CHECK_GT(mBandwidthItems.size(), 0u); initialBandwidth = mBandwidthItems[0].mBandwidth; mBandwidthItems.sort(SortByBandwidth); for (size_t i = 0; i < mBandwidthItems.size(); ++i) { if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { initialBandwidthIndex = i; break; } } } else { // dummy item. BandwidthItem item; item.mPlaylistIndex = 0; item.mBandwidth = 0; mBandwidthItems.push(item); } mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; mPlaylist->pickRandomMediaItems(); changeConfiguration( 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); } void LiveSession::finishDisconnect() { ALOGV("finishDisconnect"); // No reconfiguration is currently pending, make sure none will trigger // during disconnection either. cancelBandwidthSwitch(); // cancel buffer polling cancelPollBuffering(); // TRICKY: don't wait for all fetcher to be stopped when disconnecting // // Some fetchers might be stuck in connect/getSize at this point. These // operations will eventually timeout (as we have a timeout set in // MediaHTTPConnection), but we don't want to block the main UI thread // until then. Here we just need to make sure we clear all references // to the fetchers, so that when they finally exit from the blocking // operation, they can be destructed. // // There is one very tricky point though. For this scheme to work, the // fecther must hold a reference to LiveSession, so that LiveSession is // destroyed after fetcher. Otherwise LiveSession would get stuck in its // own destructor when it waits for mFetcherLooper to stop, which still // blocks main UI thread. for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); mFetcherLooper->unregisterHandler( mFetcherInfos.valueAt(i).mFetcher->id()); } mFetcherInfos.clear(); mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); mPacketSources.valueFor( STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); sp response = new AMessage; response->setInt32("err", OK); response->postReply(mDisconnectReplyID); mDisconnectReplyID.clear(); } sp LiveSession::addFetcher(const char *uri) { ssize_t index = mFetcherInfos.indexOfKey(uri); if (index >= 0) { return NULL; } sp notify = new AMessage(kWhatFetcherNotify, this); notify->setString("uri", uri); notify->setInt32("switchGeneration", mSwitchGeneration); FetcherInfo info; info.mFetcher = new PlaylistFetcher( notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); info.mDurationUs = -1ll; info.mToBeRemoved = false; info.mToBeResumed = false; mFetcherLooper->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); return info.mFetcher; } #if 0 static double uniformRand() { return (double)rand() / RAND_MAX; } #endif bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) { ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i, newUri ? "true" : "false", newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str()); return i >= 0 && ((!newUri && uri == mStreams[i].mUri) || (newUri && uri == mStreams[i].mNewUri)); } sp LiveSession::getPacketSourceForStreamIndex( size_t trackIndex, bool newUri) { StreamType type = indexToType(trackIndex); sp source = NULL; if (newUri) { source = mPacketSources2.valueFor(type); source->clear(); } else { source = mPacketSources.valueFor(type); }; return source; } sp LiveSession::getMetadataSource( sp sources[kNumSources], uint32_t streamMask, bool newUri) { // todo: One case where the following strategy can fail is when audio and video // are in separate playlists, both are transport streams, and the metadata // is actually contained in the audio stream. ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s", streamMask, newUri ? "true" : "false"); if ((sources[kVideoIndex] != NULL) // video fetcher; or ... || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) { // ... audio fetcher for audio only variant return getPacketSourceForStreamIndex(kMetaDataIndex, newUri); } return NULL; } bool LiveSession::resumeFetcher( const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { ssize_t index = mFetcherInfos.indexOfKey(uri); if (index < 0) { ALOGE("did not find fetcher for uri: %s", uri.c_str()); return false; } bool resume = false; sp sources[kNumSources]; for (size_t i = 0; i < kMaxStreams; ++i) { if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) { resume = true; sources[i] = getPacketSourceForStreamIndex(i, newUri); } } if (resume) { sp &fetcher = mFetcherInfos.editValueAt(index).mFetcher; SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", fetcher->getFetcherID(), (long long)timeUs, seekMode); fetcher->startAsync( sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], getMetadataSource(sources, streamMask, newUri), timeUs, -1, -1, seekMode); } return resume; } float LiveSession::getAbortThreshold( ssize_t currentBWIndex, ssize_t targetBWIndex) const { float abortThreshold = -1.0f; if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { /* If we're switching down, we need to decide whether to 1) finish last segment of high-bandwidth variant, or 2) abort last segment of high-bandwidth variant, and fetch an overlapping portion from low-bandwidth variant. Here we try to maximize the amount of buffer left when the switch point is met. Given the following parameters: B: our current buffering level in seconds T: target duration in seconds X: sample duration in seconds remain to fetch in last segment bw0: bandwidth of old variant (as specified in playlist) bw1: bandwidth of new variant (as specified in playlist) bw: measured bandwidth available If we choose 1), when switch happens at the end of current segment, our buffering will be B + X - X * bw0 / bw If we choose 2), when switch happens where we aborted current segment, our buffering will be B - (T - X) * bw1 / bw We should only choose 1) if X/T < bw1 / (bw1 + bw0 - bw) */ // abort old bandwidth immediately if bandwidth is fluctuating a lot. // our estimate could be far off, and fetching old bandwidth could // take too long. if (!mLastBandwidthStable) { return 0.0f; } // Taking the measured current bandwidth at 50% face value only, // as our bandwidth estimation is a lagging indicator. Being // conservative on this, we prefer switching to lower bandwidth // unless we're really confident finishing up the last segment // of higher bandwidth will be fast. CHECK(mLastBandwidthBps >= 0); abortThreshold = (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth - (float)mLastBandwidthBps * 0.5f); if (abortThreshold < 0.0f) { abortThreshold = -1.0f; // do not abort } ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", mBandwidthItems.itemAt(currentBWIndex).mBandwidth, mBandwidthItems.itemAt(targetBWIndex).mBandwidth, mLastBandwidthBps, abortThreshold); } return abortThreshold; } void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); } ssize_t LiveSession::getLowestValidBandwidthIndex() const { for (size_t index = 0; index < mBandwidthItems.size(); index++) { if (isBandwidthValid(mBandwidthItems[index])) { return index; } } // if playlists are all blacklisted, return 0 and hope it's alive return 0; } size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { if (mBandwidthItems.size() < 2) { // shouldn't be here if we only have 1 bandwidth, check // logic to get rid of redundant bandwidth polling ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); return 0; } #if 1 char value[PROPERTY_VALUE_MAX]; ssize_t index = -1; if (property_get("media.httplive.bw-index", value, NULL)) { char *end; index = strtol(value, &end, 10); CHECK(end > value && *end == '\0'); if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { index = mBandwidthItems.size() - 1; } } if (index < 0) { char value[PROPERTY_VALUE_MAX]; if (property_get("media.httplive.max-bw", value, NULL)) { char *end; long maxBw = strtoul(value, &end, 10); if (end > value && *end == '\0') { if (maxBw > 0 && bandwidthBps > maxBw) { ALOGV("bandwidth capped to %ld bps", maxBw); bandwidthBps = maxBw; } } } // Pick the highest bandwidth stream that's not currently blacklisted // below or equal to estimated bandwidth. index = mBandwidthItems.size() - 1; ssize_t lowestBandwidth = getLowestValidBandwidthIndex(); while (index > lowestBandwidth) { // be conservative (70%) to avoid overestimating and immediately // switching down again. size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; const BandwidthItem &item = mBandwidthItems[index]; if (item.mBandwidth <= adjustedBandwidthBps && isBandwidthValid(item)) { break; } --index; } } #elif 0 // Change bandwidth at random() size_t index = uniformRand() * mBandwidthItems.size(); #elif 0 // There's a 50% chance to stay on the current bandwidth and // a 50% chance to switch to the next higher bandwidth (wrapping around // to lowest) const size_t kMinIndex = 0; static ssize_t mCurBandwidthIndex = -1; size_t index; if (mCurBandwidthIndex < 0) { index = kMinIndex; } else if (uniformRand() < 0.5) { index = (size_t)mCurBandwidthIndex; } else { index = mCurBandwidthIndex + 1; if (index == mBandwidthItems.size()) { index = kMinIndex; } } mCurBandwidthIndex = index; #elif 0 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec size_t index = mBandwidthItems.size() - 1; while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { --index; } #elif 1 char value[PROPERTY_VALUE_MAX]; size_t index; if (property_get("media.httplive.bw-index", value, NULL)) { char *end; index = strtoul(value, &end, 10); CHECK(end > value && *end == '\0'); if (index >= mBandwidthItems.size()) { index = mBandwidthItems.size() - 1; } } else { index = 0; } #else size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream #endif CHECK_GE(index, 0); return index; } HLSTime LiveSession::latestMediaSegmentStartTime() const { HLSTime audioTime(mPacketSources.valueFor( STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); HLSTime videoTime(mPacketSources.valueFor( STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); return audioTime < videoTime ? videoTime : audioTime; } void LiveSession::onSeek(const sp &msg) { int64_t timeUs; CHECK(msg->findInt64("timeUs", &timeUs)); changeConfiguration(timeUs); } status_t LiveSession::getDuration(int64_t *durationUs) const { int64_t maxDurationUs = -1ll; for (size_t i = 0; i < mFetcherInfos.size(); ++i) { int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; if (fetcherDurationUs > maxDurationUs) { maxDurationUs = fetcherDurationUs; } } *durationUs = maxDurationUs; return OK; } bool LiveSession::isSeekable() const { int64_t durationUs; return getDuration(&durationUs) == OK && durationUs >= 0; } bool LiveSession::hasDynamicDuration() const { return false; } size_t LiveSession::getTrackCount() const { if (mPlaylist == NULL) { return 0; } else { return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0); } } sp LiveSession::getTrackInfo(size_t trackIndex) const { if (mPlaylist == NULL) { return NULL; } else { if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) { sp format = new AMessage(); format->setInt32("type", MEDIA_TRACK_TYPE_METADATA); format->setString("language", "und"); format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3); return format; } return mPlaylist->getTrackInfo(trackIndex); } } status_t LiveSession::selectTrack(size_t index, bool select) { if (mPlaylist == NULL) { return INVALID_OPERATION; } ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", index, select, mSubtitleGeneration); ++mSubtitleGeneration; status_t err = mPlaylist->selectTrack(index, select); if (err == OK) { sp msg = new AMessage(kWhatChangeConfiguration, this); msg->setInt32("pickTrack", select); msg->post(); } return err; } ssize_t LiveSession::getSelectedTrack(media_track_type type) const { if (mPlaylist == NULL) { return -1; } else { return mPlaylist->getSelectedTrack(type); } } void LiveSession::changeConfiguration( int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", (long long)timeUs, bandwidthIndex, pickTrack); cancelBandwidthSwitch(); CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; if (bandwidthIndex >= 0) { mOrigBandwidthIndex = mCurBandwidthIndex; mCurBandwidthIndex = bandwidthIndex; if (mOrigBandwidthIndex != mCurBandwidthIndex) { ALOGI("#### Starting Bandwidth Switch: %zd => %zd", mOrigBandwidthIndex, mCurBandwidthIndex); } } CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); uint32_t streamMask = 0; // streams that should be fetched by the new fetcher uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher AString URIs[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { streamMask |= indexToType(i); } } // Step 1, stop and discard fetchers that are no longer needed. // Pause those that we'll reuse. for (size_t i = 0; i < mFetcherInfos.size(); ++i) { // skip fetchers that are marked mToBeRemoved, // these are done and can't be reused if (mFetcherInfos[i].mToBeRemoved) { continue; } const AString &uri = mFetcherInfos.keyAt(i); sp &fetcher = mFetcherInfos.editValueAt(i).mFetcher; bool discardFetcher = true, delayRemoval = false; for (size_t j = 0; j < kMaxStreams; ++j) { StreamType type = indexToType(j); if ((streamMask & type) && uri == URIs[j]) { resumeMask |= type; streamMask &= ~type; discardFetcher = false; } } // Delay fetcher removal if not picking tracks, AND old fetcher // has stream mask that overlaps new variant. (Okay to discard // old fetcher now, if completely no overlap.) if (discardFetcher && timeUs < 0ll && !pickTrack && (fetcher->getStreamTypeMask() & streamMask)) { discardFetcher = false; delayRemoval = true; } if (discardFetcher) { ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); fetcher->stopAsync(); } else { float threshold = 0.0f; // default to pause after current block (47Kbytes) bool disconnect = false; if (timeUs >= 0ll) { // seeking, no need to finish fetching disconnect = true; } else if (delayRemoval) { // adapting, abort if remaining of current segment is over threshold threshold = getAbortThreshold( mOrigBandwidthIndex, mCurBandwidthIndex); } ALOGV("pausing fetcher-%d, threshold=%.2f", fetcher->getFetcherID(), threshold); fetcher->pauseAsync(threshold, disconnect); } } sp msg; if (timeUs < 0ll) { // skip onChangeConfiguration2 (decoder destruction) if not seeking. msg = new AMessage(kWhatChangeConfiguration3, this); } else { msg = new AMessage(kWhatChangeConfiguration2, this); } msg->setInt32("streamMask", streamMask); msg->setInt32("resumeMask", resumeMask); msg->setInt32("pickTrack", pickTrack); msg->setInt64("timeUs", timeUs); for (size_t i = 0; i < kMaxStreams; ++i) { if ((streamMask | resumeMask) & indexToType(i)) { msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); } } // Every time a fetcher acknowledges the stopAsync or pauseAsync request // we'll decrement mContinuationCounter, once it reaches zero, i.e. all // fetchers have completed their asynchronous operation, we'll post // mContinuation, which then is handled below in onChangeConfiguration2. mContinuationCounter = mFetcherInfos.size(); mContinuation = msg; if (mContinuationCounter == 0) { msg->post(); } } void LiveSession::onChangeConfiguration(const sp &msg) { ALOGV("onChangeConfiguration"); if (!mReconfigurationInProgress) { int32_t pickTrack = 0; msg->findInt32("pickTrack", &pickTrack); changeConfiguration(-1ll /* timeUs */, -1, pickTrack); } else { msg->post(1000000ll); // retry in 1 sec } } void LiveSession::onChangeConfiguration2(const sp &msg) { ALOGV("onChangeConfiguration2"); mContinuation.clear(); // All fetchers are either suspended or have been removed now. // If we're seeking, clear all packet sources before we report // seek complete, to prevent decoder from pulling stale data. int64_t timeUs; CHECK(msg->findInt64("timeUs", &timeUs)); if (timeUs >= 0) { mLastSeekTimeUs = timeUs; mLastDequeuedTimeUs = timeUs; for (size_t i = 0; i < mPacketSources.size(); i++) { sp packetSource = mPacketSources.editValueAt(i); sp format = packetSource->getFormat(); packetSource->clear(); // Set a tentative format here such that HTTPLiveSource will always have // a format available when NuPlayer queries. Without an available video // format when setting a surface NuPlayer might disable video decoding // altogether. The tentative format will be overwritten by the // authoritative (and possibly same) format once content from the new // position is dequeued. packetSource->setFormat(format); } for (size_t i = 0; i < kMaxStreams; ++i) { mStreams[i].reset(); } mDiscontinuityOffsetTimesUs.clear(); mDiscontinuityAbsStartTimesUs.clear(); if (mSeekReplyID != NULL) { CHECK(mSeekReply != NULL); mSeekReply->setInt32("err", OK); mSeekReply->postReply(mSeekReplyID); mSeekReplyID.clear(); mSeekReply.clear(); } // restart buffer polling after seek becauese previous // buffering position is no longer valid. restartPollBuffering(); } uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); streamMask |= resumeMask; AString URIs[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { const AString &uriKey = mStreams[i].uriKey(); CHECK(msg->findString(uriKey.c_str(), &URIs[i])); ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); } } uint32_t changedMask = 0; for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { // stream URI could change even if onChangeConfiguration2 is only // used for seek. Seek could happen during a bw switch, in this // case bw switch will be cancelled, but the seekTo position will // fetch from the new URI. if ((mStreamMask & streamMask & indexToType(i)) && !mStreams[i].mUri.empty() && !(URIs[i] == mStreams[i].mUri)) { ALOGV("stream %zu changed: oldURI %s, newURI %s", i, mStreams[i].mUri.c_str(), URIs[i].c_str()); sp source = mPacketSources.valueFor(indexToType(i)); if (source->getLatestDequeuedMeta() != NULL) { source->queueDiscontinuity( ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); } } // Determine which decoders to shutdown on the player side, // a decoder has to be shutdown if its streamtype was active // before but now longer isn't. if ((mStreamMask & ~streamMask & indexToType(i))) { changedMask |= indexToType(i); } } if (changedMask == 0) { // If nothing changed as far as the audio/video decoders // are concerned we can proceed. onChangeConfiguration3(msg); return; } // Something changed, inform the player which will shutdown the // corresponding decoders and will post the reply once that's done. // Handling the reply will continue executing below in // onChangeConfiguration3. sp notify = mNotify->dup(); notify->setInt32("what", kWhatStreamsChanged); notify->setInt32("changedMask", changedMask); msg->setWhat(kWhatChangeConfiguration3); msg->setTarget(this); notify->setMessage("reply", msg); notify->post(); } void LiveSession::onChangeConfiguration3(const sp &msg) { mContinuation.clear(); // All remaining fetchers are still suspended, the player has shutdown // any decoders that needed it. uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); mNewStreamMask = streamMask | resumeMask; int64_t timeUs; int32_t pickTrack; bool switching = false; CHECK(msg->findInt64("timeUs", &timeUs)); CHECK(msg->findInt32("pickTrack", &pickTrack)); if (timeUs < 0ll) { if (!pickTrack) { // mSwapMask contains streams that are in both old and new variant, // (in mNewStreamMask & mStreamMask) but with different URIs // (not in resumeMask). // For example, old variant has video and audio in two separate // URIs, and new variant has only audio with unchanged URI. mSwapMask // should be 0 as there is nothing to swap. We only need to stop video, // and resume audio. mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; switching = (mSwapMask != 0); } mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; } else { mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; } ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", (long long)timeUs, switching, pickTrack, mStreamMask, mNewStreamMask, mSwapMask); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { if (switching) { CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); } else { CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); } } } // Of all existing fetchers: // * Resume fetchers that are still needed and assign them original packet sources. // * Mark otherwise unneeded fetchers for removal. ALOGV("resuming fetchers for mask 0x%08x", resumeMask); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { const AString &uri = mFetcherInfos.keyAt(i); if (!resumeFetcher(uri, resumeMask, timeUs)) { ALOGV("marking fetcher-%d to be removed", mFetcherInfos[i].mFetcher->getFetcherID()); mFetcherInfos.editValueAt(i).mToBeRemoved = true; } } // streamMask now only contains the types that need a new fetcher created. if (streamMask != 0) { ALOGV("creating new fetchers for mask 0x%08x", streamMask); } // Find out when the original fetchers have buffered up to and start the new fetchers // at a later timestamp. for (size_t i = 0; i < kMaxStreams; i++) { if (!(indexToType(i) & streamMask)) { continue; } AString uri; uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; sp fetcher = addFetcher(uri.c_str()); CHECK(fetcher != NULL); HLSTime startTime; SeekMode seekMode = kSeekModeExactPosition; sp sources[kNumSources]; if (i == kSubtitleIndex || (!pickTrack && !switching)) { startTime = latestMediaSegmentStartTime(); } // TRICKY: looping from i as earlier streams are already removed from streamMask for (size_t j = i; j < kMaxStreams; ++j) { const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; if ((streamMask & indexToType(j)) && uri == streamUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (timeUs >= 0) { startTime.mTimeUs = timeUs; } else { int32_t type; sp meta; if (!switching) { // selecting, or adapting but no swap required meta = sources[j]->getLatestDequeuedMeta(); } else { // adapting and swap required meta = sources[j]->getLatestEnqueuedMeta(); if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { // switching up meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); } } if ((j == kAudioIndex || j == kVideoIndex) && meta != NULL && !meta->findInt32("discontinuity", &type)) { HLSTime tmpTime(meta); if (startTime < tmpTime) { startTime = tmpTime; } } if (!switching) { // selecting, or adapting but no swap required sources[j]->clear(); if (j == kSubtitleIndex) { break; } ALOGV("stream[%zu]: queue format change", j); sources[j]->queueDiscontinuity( ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); } else { // switching, queue discontinuities after resume sources[j] = mPacketSources2.valueFor(indexToType(j)); sources[j]->clear(); // the new fetcher might be providing streams that used to be // provided by two different fetchers, if one of the fetcher // paused in the middle while the other somehow paused in next // seg, we have to start from next seg. if (seekMode < mStreams[j].mSeekMode) { seekMode = mStreams[j].mSeekMode; } } } streamMask &= ~indexToType(j); } } ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " "segmentStartTimeUs %lld seekMode %d", fetcher->getFetcherID(), (long long)startTime.mTimeUs, (long long)mLastSeekTimeUs, (long long)startTime.getSegmentTimeUs(), seekMode); // Set the target segment start time to the middle point of the // segment where the last sample was. // This gives a better guess if segments of the two variants are not // perfectly aligned. (If the corresponding segment in new variant // starts slightly later than that in the old variant, we still want // to pick that segment, not the one before) fetcher->startAsync( sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], getMetadataSource(sources, mNewStreamMask, switching), startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, startTime.getSegmentTimeUs(), startTime.mSeq, seekMode); } // All fetchers have now been started, the configuration change // has completed. mReconfigurationInProgress = false; if (switching) { mSwitchInProgress = true; } else { mStreamMask = mNewStreamMask; if (mOrigBandwidthIndex != mCurBandwidthIndex) { ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd", mOrigBandwidthIndex, mCurBandwidthIndex); mOrigBandwidthIndex = mCurBandwidthIndex; } } ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", mSwitchInProgress, mStreamMask); if (mDisconnectReplyID != NULL) { finishDisconnect(); } } void LiveSession::swapPacketSource(StreamType stream) { ALOGV("[%s] swapPacketSource", getNameForStream(stream)); // transfer packets from source2 to source sp &aps = mPacketSources.editValueFor(stream); sp &aps2 = mPacketSources2.editValueFor(stream); // queue discontinuity in mPacketSource aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); // queue packets in mPacketSource2 to mPacketSource status_t finalResult = OK; sp accessUnit; while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && OK == aps2->dequeueAccessUnit(&accessUnit)) { aps->queueAccessUnit(accessUnit); } aps2->clear(); } void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { if (!mSwitchInProgress) { return; } ssize_t index = mFetcherInfos.indexOfKey(oldUri); if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { return; } // Swap packet source of streams provided by old variant for (size_t idx = 0; idx < kMaxStreams; idx++) { StreamType stream = indexToType(idx); if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { swapPacketSource(stream); if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); } mStreams[idx].mUri = mStreams[idx].mNewUri; mStreams[idx].mNewUri.clear(); mSwapMask &= ~stream; } } mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); if (mSwapMask != 0) { return; } // Check if new variant contains extra streams. uint32_t extraStreams = mNewStreamMask & (~mStreamMask); while (extraStreams) { StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); extraStreams &= ~stream; swapPacketSource(stream); ssize_t idx = typeToIndex(stream); CHECK(idx >= 0); if (mStreams[idx].mNewUri.empty()) { ALOGW("swapping extra stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); } mStreams[idx].mUri = mStreams[idx].mNewUri; mStreams[idx].mNewUri.clear(); } // Restart new fetcher (it was paused after the first 47k block) // and let it fetch into mPacketSources (not mPacketSources2) for (size_t i = 0; i < mFetcherInfos.size(); ++i) { FetcherInfo &info = mFetcherInfos.editValueAt(i); if (info.mToBeResumed) { resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); info.mToBeResumed = false; } } ALOGI("#### Finished Bandwidth Switch: %zd => %zd", mOrigBandwidthIndex, mCurBandwidthIndex); mStreamMask = mNewStreamMask; mSwitchInProgress = false; mOrigBandwidthIndex = mCurBandwidthIndex; restartPollBuffering(); } void LiveSession::schedulePollBuffering() { sp msg = new AMessage(kWhatPollBuffering, this); msg->setInt32("generation", mPollBufferingGeneration); msg->post(1000000ll); } void LiveSession::cancelPollBuffering() { ++mPollBufferingGeneration; mPrevBufferPercentage = -1; } void LiveSession::restartPollBuffering() { cancelPollBuffering(); onPollBuffering(); } void LiveSession::onPollBuffering() { ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", mSwitchInProgress, mReconfigurationInProgress, mInPreparationPhase, mCurBandwidthIndex, mStreamMask); bool underflow, ready, down, up; if (checkBuffering(underflow, ready, down, up)) { if (mInPreparationPhase) { // Allow down switch even if we're still preparing. // // Some streams have a high bandwidth index as default, // when bandwidth is low, it takes a long time to buffer // to ready mark, then it immediately pauses after start // as we have to do a down switch. It's better experience // to restart from a lower index, if we detect low bw. if (!switchBandwidthIfNeeded(false /* up */, down) && ready) { postPrepared(OK); } } if (!mInPreparationPhase) { if (ready) { stopBufferingIfNecessary(); } else if (underflow) { startBufferingIfNecessary(); } switchBandwidthIfNeeded(up, down); } } schedulePollBuffering(); } void LiveSession::cancelBandwidthSwitch(bool resume) { ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); if (!mSwitchInProgress) { return; } for (size_t i = 0; i < mFetcherInfos.size(); ++i) { FetcherInfo& info = mFetcherInfos.editValueAt(i); if (info.mToBeRemoved) { info.mToBeRemoved = false; if (resume) { resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); } } } for (size_t i = 0; i < kMaxStreams; ++i) { AString newUri = mStreams[i].mNewUri; if (!newUri.empty()) { // clear all mNewUri matching this newUri for (size_t j = i; j < kMaxStreams; ++j) { if (mStreams[j].mNewUri == newUri) { mStreams[j].mNewUri.clear(); } } ALOGV("stopping newUri = %s", newUri.c_str()); ssize_t index = mFetcherInfos.indexOfKey(newUri); if (index < 0) { ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); continue; } FetcherInfo &info = mFetcherInfos.editValueAt(index); info.mToBeRemoved = true; info.mFetcher->stopAsync(); } } ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", mOrigBandwidthIndex, mCurBandwidthIndex); mSwitchGeneration++; mSwitchInProgress = false; mCurBandwidthIndex = mOrigBandwidthIndex; mSwapMask = 0; } bool LiveSession::checkBuffering( bool &underflow, bool &ready, bool &down, bool &up) { underflow = ready = down = up = false; if (mReconfigurationInProgress) { ALOGV("Switch/Reconfig in progress, defer buffer polling"); return false; } size_t activeCount, underflowCount, readyCount, downCount, upCount; activeCount = underflowCount = readyCount = downCount = upCount =0; int32_t minBufferPercent = -1; int64_t durationUs; if (getDuration(&durationUs) != OK) { durationUs = -1; } for (size_t i = 0; i < mPacketSources.size(); ++i) { // we don't check subtitles for buffering level if (!(mStreamMask & mPacketSources.keyAt(i) & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { continue; } // ignore streams that never had any packet queued. // (it's possible that the variant only has audio or video) sp meta = mPacketSources[i]->getLatestEnqueuedMeta(); if (meta == NULL) { continue; } status_t finalResult; int64_t bufferedDurationUs = mPacketSources[i]->getBufferedDurationUs(&finalResult); ALOGV("[%s] buffered %lld us", getNameForStream(mPacketSources.keyAt(i)), (long long)bufferedDurationUs); if (durationUs >= 0) { int32_t percent; if (mPacketSources[i]->isFinished(0 /* duration */)) { percent = 100; } else { percent = (int32_t)(100.0 * (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); } if (minBufferPercent < 0 || percent < minBufferPercent) { minBufferPercent = percent; } } ++activeCount; int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs; if (bufferedDurationUs > readyMark || mPacketSources[i]->isFinished(0)) { ++readyCount; } if (!mPacketSources[i]->isFinished(0)) { if (bufferedDurationUs < kUnderflowMarkUs) { ++underflowCount; } if (bufferedDurationUs > mUpSwitchMark) { ++upCount; } if (bufferedDurationUs < mDownSwitchMark) { ++downCount; } } } if (minBufferPercent >= 0) { notifyBufferingUpdate(minBufferPercent); } if (activeCount > 0) { up = (upCount == activeCount); down = (downCount > 0); ready = (readyCount == activeCount); underflow = (underflowCount > 0); return true; } return false; } void LiveSession::startBufferingIfNecessary() { ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", mInPreparationPhase, mBuffering); if (!mBuffering) { mBuffering = true; sp notify = mNotify->dup(); notify->setInt32("what", kWhatBufferingStart); notify->post(); } } void LiveSession::stopBufferingIfNecessary() { ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", mInPreparationPhase, mBuffering); if (mBuffering) { mBuffering = false; sp notify = mNotify->dup(); notify->setInt32("what", kWhatBufferingEnd); notify->post(); } } void LiveSession::notifyBufferingUpdate(int32_t percentage) { if (percentage < mPrevBufferPercentage) { percentage = mPrevBufferPercentage; } else if (percentage > 100) { percentage = 100; } mPrevBufferPercentage = percentage; ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); sp notify = mNotify->dup(); notify->setInt32("what", kWhatBufferingUpdate); notify->setInt32("percentage", percentage); notify->post(); } bool LiveSession::tryBandwidthFallback() { if (mInPreparationPhase || mReconfigurationInProgress) { // Don't try fallback during prepare or reconfig. // If error happens there, it's likely unrecoverable. return false; } if (mCurBandwidthIndex > mOrigBandwidthIndex) { // if we're switching up, simply cancel and resume old variant cancelBandwidthSwitch(true /* resume */); return true; } else { // if we're switching down, we're likely about to underflow (if // not already underflowing). try the lowest viable bandwidth if // not on that variant already. ssize_t lowestValid = getLowestValidBandwidthIndex(); if (mCurBandwidthIndex > lowestValid) { cancelBandwidthSwitch(); changeConfiguration(-1ll, lowestValid); return true; } } // return false if we couldn't find any fallback return false; } /* * returns true if a bandwidth switch is actually needed (and started), * returns false otherwise */ bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { // no need to check bandwidth if we only have 1 bandwidth settings if (mBandwidthItems.size() < 2) { return false; } if (mSwitchInProgress) { if (mBuffering) { tryBandwidthFallback(); } return false; } int32_t bandwidthBps, shortTermBps; bool isStable; if (mBandwidthEstimator->estimateBandwidth( &bandwidthBps, &isStable, &shortTermBps)) { ALOGV("bandwidth estimated at %.2f kbps, " "stable %d, shortTermBps %.2f kbps", bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f); mLastBandwidthBps = bandwidthBps; mLastBandwidthStable = isStable; } else { ALOGV("no bandwidth estimate."); return false; } int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; // canSwithDown and canSwitchUp can't both be true. // we only want to switch up when measured bw is 120% higher than current variant, // and we only want to switch down when measured bw is below current variant. bool canSwitchDown = bufferLow && (bandwidthBps < (int32_t)curBandwidth); bool canSwitchUp = bufferHigh && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); if (canSwitchDown || canSwitchUp) { // bandwidth estimating has some delay, if we have to downswitch when // it hasn't stabilized, use the short term to guess real bandwidth, // since it may be dropping too fast. // (note this doesn't apply to upswitch, always use longer average there) if (!isStable && canSwitchDown) { if (shortTermBps < bandwidthBps) { bandwidthBps = shortTermBps; } } ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); // it's possible that we're checking for canSwitchUp case, but the returned // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% // of measured bw. In that case we don't want to do anything, since we have // both enough buffer and enough bw. if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) { // if not yet prepared, just restart again with new bw index. // this is faster and playback experience is cleaner. changeConfiguration( mInPreparationPhase ? 0 : -1ll, bandwidthIndex); return true; } } return false; } void LiveSession::postError(status_t err) { // if we reached EOS, notify buffering of 100% if (err == ERROR_END_OF_STREAM) { notifyBufferingUpdate(100); } // we'll stop buffer polling now, before that notify // stop buffering to stop the spinning icon stopBufferingIfNecessary(); cancelPollBuffering(); sp notify = mNotify->dup(); notify->setInt32("what", kWhatError); notify->setInt32("err", err); notify->post(); } void LiveSession::postPrepared(status_t err) { CHECK(mInPreparationPhase); sp notify = mNotify->dup(); if (err == OK || err == ERROR_END_OF_STREAM) { notify->setInt32("what", kWhatPrepared); } else { cancelPollBuffering(); notify->setInt32("what", kWhatPreparationFailed); notify->setInt32("err", err); } notify->post(); mInPreparationPhase = false; } } // namespace android