From 7c8708046117e03c0d38006bdd9685139df3ac6b Mon Sep 17 00:00:00 2001 From: Chong Zhang Date: Tue, 17 Mar 2015 16:27:56 -0700 Subject: HLS: faster switching and pause/resume on low buffer - when upswitching, discard excessive buffering on low bandwidth variant, switch to new variant earlier - when downswitching, report newly found IDR positions continuously, and switch as soon as new fetcher passes playback position. This allows us to skip time-consuming resumeUntil() of old fetcher most of the time - implement pause/resume on low buffering, and notify buffering percentage - buffering parameter tuning, separate pause/resume/ready buffer level and up/down switch buffer level, boost up fetcher buffering significantly bug: 19567254 Change-Id: I750dfcc6f861d78d16a71f501beb86d8129cb048 --- .../nuplayer/HTTPLiveSource.cpp | 28 + media/libstagefright/httplive/LiveSession.cpp | 666 ++++++++++++++++----- media/libstagefright/httplive/LiveSession.h | 50 +- media/libstagefright/httplive/PlaylistFetcher.cpp | 340 ++++++----- media/libstagefright/httplive/PlaylistFetcher.h | 11 +- .../libstagefright/mpeg2ts/AnotherPacketSource.cpp | 164 ++++- media/libstagefright/mpeg2ts/AnotherPacketSource.h | 7 + 7 files changed, 945 insertions(+), 321 deletions(-) (limited to 'media') diff --git a/media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp b/media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp index d01e83a..0476c9b 100644 --- a/media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp +++ b/media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp @@ -281,6 +281,34 @@ void NuPlayer::HTTPLiveSource::onSessionNotify(const sp &msg) { break; } + case LiveSession::kWhatBufferingStart: + { + sp notify = dupNotify(); + notify->setInt32("what", kWhatPauseOnBufferingStart); + notify->post(); + break; + } + + case LiveSession::kWhatBufferingEnd: + { + sp notify = dupNotify(); + notify->setInt32("what", kWhatResumeOnBufferingEnd); + notify->post(); + break; + } + + + case LiveSession::kWhatBufferingUpdate: + { + sp notify = dupNotify(); + int32_t percentage; + CHECK(msg->findInt32("percentage", &percentage)); + notify->setInt32("what", kWhatBufferingUpdate); + notify->setInt32("percentage", percentage); + notify->post(); + break; + } + case LiveSession::kWhatError: { break; diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index 738f8b6..4ac2fb2 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -49,12 +50,6 @@ namespace android { -// static -// High water mark to start up switch or report prepared) -const int64_t LiveSession::kHighWaterMark = 8000000ll; -const int64_t LiveSession::kMidWaterMark = 5000000ll; -const int64_t LiveSession::kLowWaterMark = 3000000ll; - struct LiveSession::BandwidthEstimator : public RefBase { BandwidthEstimator(); @@ -119,15 +114,34 @@ bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) { return true; } +//static +const char *LiveSession::getKeyForStream(StreamType type) { + switch (type) { + case STREAMTYPE_VIDEO: + return "timeUsVideo"; + case STREAMTYPE_AUDIO: + return "timeUsAudio"; + case STREAMTYPE_SUBTITLES: + return "timeUsSubtitle"; + default: + TRESPASS(); + } + return NULL; +} + LiveSession::LiveSession( const sp ¬ify, uint32_t flags, const sp &httpService) : mNotify(notify), mFlags(flags), mHTTPService(httpService), + mBuffering(false), mInPreparationPhase(true), + mPollBufferingGeneration(0), + mPrevBufferPercentage(-1), mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), mCurBandwidthIndex(-1), + mOrigBandwidthIndex(-1), mLastBandwidthBps(-1ll), mBandwidthEstimator(new BandwidthEstimator()), mStreamMask(0), @@ -139,11 +153,12 @@ LiveSession::LiveSession( mRealTimeBaseUs(0ll), mReconfigurationInProgress(false), mSwitchInProgress(false), + mUpSwitchMark(kUpSwitchMark), + mDownSwitchMark(kDownSwitchMark), + mUpSwitchMargin(kUpSwitchMargin), mFirstTimeUsValid(false), mFirstTimeUs(0), - mLastSeekTimeUs(0), - mPollBufferingGeneration(0) { - + mLastSeekTimeUs(0) { mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); @@ -162,12 +177,6 @@ LiveSession::~LiveSession() { status_t LiveSession::dequeueAccessUnit( StreamType stream, sp *accessUnit) { - if (!(mStreamMask & stream)) { - // return -EWOULDBLOCK to avoid halting the decoder - // when switching between audio/video and audio only. - return -EWOULDBLOCK; - } - status_t finalResult = OK; sp packetSource = mPacketSources.valueFor(stream); @@ -225,26 +234,6 @@ status_t LiveSession::dequeueAccessUnit( streamStr, type, extra == NULL ? "NULL" : extra->debugString().c_str()); - - size_t seq = strm.mCurDiscontinuitySeq; - int64_t offsetTimeUs; - if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { - offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); - } else { - offsetTimeUs = 0; - } - - seq += 1; - if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { - int64_t firstTimeUs; - firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); - offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; - offsetTimeUs += strm.mLastSampleDurationUs; - } else { - offsetTimeUs += strm.mLastSampleDurationUs; - } - - mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { @@ -252,7 +241,26 @@ status_t LiveSession::dequeueAccessUnit( int32_t discontinuitySeq = 0; CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); - strm.mCurDiscontinuitySeq = discontinuitySeq; + if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { + int64_t offsetTimeUs; + if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); + } else { + offsetTimeUs = 0; + } + + if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + int64_t firstTimeUs; + firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); + offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; + offsetTimeUs += strm.mLastSampleDurationUs; + } else { + offsetTimeUs += strm.mLastSampleDurationUs; + } + + mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); + strm.mCurDiscontinuitySeq = discontinuitySeq; + } int32_t discard = 0; int64_t firstTimeUs; @@ -317,6 +325,11 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp *format) { return -EAGAIN; } + if (stream == STREAMTYPE_AUDIO) { + // set AAC input buffer size to 32K bytes (256kbps x 1sec) + meta->setInt32(kKeyMaxInputSize, 32 * 1024); + } + return convertMetaDataToMessage(meta, format); } @@ -357,6 +370,102 @@ status_t LiveSession::seekTo(int64_t timeUs) { return err; } +bool LiveSession::checkSwitchProgress( + sp &stopParams, int64_t delayUs, bool *needResumeUntil) { + AString newUri; + CHECK(stopParams->findString("uri", &newUri)); + + *needResumeUntil = false; + sp firstNewMeta[kMaxStreams]; + for (size_t i = 0; i < kMaxStreams; ++i) { + StreamType stream = indexToType(i); + if (!(mSwapMask & mNewStreamMask & stream) + || (mStreams[i].mNewUri != newUri)) { + continue; + } + if (stream == STREAMTYPE_SUBTITLES) { + continue; + } + sp &source = mPacketSources.editValueAt(i); + + // First, get latest dequeued meta, which is where the decoder is at. + // (when upswitching, we take the meta after a certain delay, so that + // the decoder is left with some cushion) + sp lastDequeueMeta, lastEnqueueMeta; + if (delayUs > 0) { + lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); + } else { + lastDequeueMeta = source->getLatestDequeuedMeta(); + } + // Then, trim off packets at beginning of mPacketSources2 that's before + // the latest dequeued time. These samples are definitely too late. + int64_t lastTimeUs, startTimeUs; + int32_t lastSeq, startSeq; + if (lastDequeueMeta != NULL) { + CHECK(lastDequeueMeta->findInt64("timeUs", &lastTimeUs)); + CHECK(lastDequeueMeta->findInt32("discontinuitySeq", &lastSeq)); + firstNewMeta[i] = mPacketSources2.editValueAt(i) + ->trimBuffersBeforeTimeUs(lastSeq, lastTimeUs); + } + // Now firstNewMeta[i] is the first sample after the trim. + // If it's NULL, we failed because dequeue already past all samples + // in mPacketSource2, we have to try again. + if (firstNewMeta[i] == NULL) { + ALOGV("[%s] dequeue time (%d, %lld) past start time", + stream == STREAMTYPE_AUDIO ? "audio" : "video", + lastSeq, (long long) lastTimeUs); + return false; + } + + // Otherwise, we check if mPacketSources2 overlaps with what old fetcher + // already fetched, and see if we need to resumeUntil + lastEnqueueMeta = source->getLatestEnqueuedMeta(); + // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity + // boundary, no need to resume as the content will look different anyways + if (lastEnqueueMeta != NULL) { + CHECK(lastEnqueueMeta->findInt64("timeUs", &lastTimeUs)); + CHECK(lastEnqueueMeta->findInt32("discontinuitySeq", &lastSeq)); + CHECK(firstNewMeta[i]->findInt64("timeUs", &startTimeUs)); + CHECK(firstNewMeta[i]->findInt32("discontinuitySeq", &startSeq)); + + // no need to resume old fetcher if new fetcher started in different + // discontinuity sequence, as the content will look different. + *needResumeUntil |= + (startSeq == lastSeq + && startTimeUs - lastTimeUs > 100000ll); + + // update the stopTime for resumeUntil, as we might have removed some + // packets from the head in mPacketSource2 + stopParams->setInt64(getKeyForStream(stream), startTimeUs); + } + } + + // if we're here, it means dequeue progress hasn't passed some samples in + // mPacketSource2, we can trim off the excess in mPacketSource. + // (old fetcher might still need to resumeUntil the start time of new fetcher) + for (size_t i = 0; i < kMaxStreams; ++i) { + StreamType stream = indexToType(i); + if (!(mSwapMask & mNewStreamMask & stream) + || (newUri != mStreams[i].mNewUri)) { + continue; + } + if (stream == STREAMTYPE_SUBTITLES) { + continue; + } + int64_t startTimeUs; + int32_t startSeq; + CHECK(firstNewMeta[i] != NULL); + CHECK(firstNewMeta[i]->findInt64("timeUs", &startTimeUs)); + CHECK(firstNewMeta[i]->findInt32("discontinuitySeq", &startSeq)); + mPacketSources.valueFor(stream)->trimBuffersAfterTimeUs(startSeq, startTimeUs); + } + + // no resumeUntil if already underflow + *needResumeUntil &= !mBuffering; + + return true; +} + void LiveSession::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatConnect: @@ -412,8 +521,6 @@ void LiveSession::onMessageReceived(const sp &msg) { } if (what == PlaylistFetcher::kWhatStopped) { - tryToFinishBandwidthSwitch(uri); - mFetcherLooper->unregisterHandler( mFetcherInfos[index].mFetcher->id()); mFetcherInfos.removeItemsAt(index); @@ -452,6 +559,16 @@ void LiveSession::onMessageReceived(const sp &msg) { break; } + case PlaylistFetcher::kWhatTargetDurationUpdate: + { + int64_t targetDurationUs; + CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); + mUpSwitchMark = min(kUpSwitchMark, targetDurationUs * 3); + mDownSwitchMark = min(kDownSwitchMark, targetDurationUs * 9 / 4); + mUpSwitchMargin = min(kUpSwitchMargin, targetDurationUs); + break; + } + case PlaylistFetcher::kWhatError: { status_t err; @@ -489,10 +606,23 @@ void LiveSession::onMessageReceived(const sp &msg) { mPacketSources.valueFor( STREAMTYPE_SUBTITLES)->signalEOS(err); - sp notify = mNotify->dup(); - notify->setInt32("what", kWhatError); - notify->setInt32("err", err); - notify->post(); + postError(err); + break; + } + + case PlaylistFetcher::kWhatStopReached: + { + ALOGV("kWhatStopReached"); + + AString uri; + CHECK(msg->findString("uri", &uri)); + + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0) { + break; + } + + tryToFinishBandwidthSwitch(uri); break; } @@ -507,20 +637,67 @@ void LiveSession::onMessageReceived(const sp &msg) { AString uri; CHECK(msg->findString("uri", &uri)); + + // mark new fetcher mToBeResumed ssize_t index = mFetcherInfos.indexOfKey(uri); if (index >= 0) { mFetcherInfos.editValueAt(index).mToBeResumed = true; } - // Resume fetcher for the original variant; the resumed fetcher should - // continue until the timestamps found in msg, which is stored by the - // new fetcher to indicate where the new variant has started buffering. - for (size_t i = 0; i < mFetcherInfos.size(); i++) { - const FetcherInfo info = mFetcherInfos.valueAt(i); - if (info.mToBeRemoved) { - info.mFetcher->resumeUntilAsync(msg); + // temporarily disable packet sources to be swapped to prevent + // NuPlayerDecoder from dequeuing while we check progress + for (size_t i = 0; i < mPacketSources.size(); ++i) { + if ((mSwapMask & mPacketSources.keyAt(i)) + && uri == mStreams[i].mNewUri) { + mPacketSources.editValueAt(i)->enable(false); } } + bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); + // If switching up, require a cushion bigger than kUnderflowMark + // to avoid buffering immediately after the switch. + // (If we don't have that cushion we'd rather cancel and try again.) + int64_t delayUs = switchUp ? (kUnderflowMark + 1000000ll) : 0; + bool needResumeUntil = false; + sp stopParams = msg; + if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { + // playback time hasn't passed startAt time + if (!needResumeUntil) { + for (size_t i = 0; i < kMaxStreams; ++i) { + if ((mSwapMask & indexToType(i)) + && uri == mStreams[i].mNewUri) { + // have to make a copy of mStreams[i].mUri because + // tryToFinishBandwidthSwitch is modifying mStreams[] + AString oldURI = mStreams[i].mUri; + tryToFinishBandwidthSwitch(oldURI); + break; + } + } + } else { + // startAt time is after last enqueue time + // Resume fetcher for the original variant; the resumed fetcher should + // continue until the timestamps found in msg, which is stored by the + // new fetcher to indicate where the new variant has started buffering. + for (size_t i = 0; i < mFetcherInfos.size(); i++) { + const FetcherInfo &info = mFetcherInfos.valueAt(i); + if (info.mToBeRemoved) { + info.mFetcher->resumeUntilAsync(stopParams); + } + } + } + } else { + // playback time passed startAt time + if (switchUp) { + // if switching up, cancel and retry if condition satisfies again + cancelBandwidthSwitch(true /* resume */); + } else { + resumeFetcher(uri, mSwapMask, -1, true /* newUri */); + } + } + // re-enable all packet sources + for (size_t i = 0; i < mPacketSources.size(); ++i) { + mPacketSources.editValueAt(i)->enable(true); + } + break; } @@ -688,8 +865,6 @@ void LiveSession::onConnect(const sp &msg) { mPlaylist->pickRandomMediaItems(); changeConfiguration( 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); - - schedulePollBuffering(); } void LiveSession::finishDisconnect() { @@ -950,6 +1125,43 @@ static double uniformRand() { } #endif +bool LiveSession::resumeFetcher( + const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0) { + ALOGE("did not find fetcher for uri: %s", uri.c_str()); + return false; + } + + bool resume = false; + sp sources[kMaxStreams]; + for (size_t i = 0; i < kMaxStreams; ++i) { + if ((streamMask & indexToType(i)) + && ((!newUri && uri == mStreams[i].mUri) + || (newUri && uri == mStreams[i].mNewUri))) { + resume = true; + if (newUri) { + sources[i] = mPacketSources2.valueFor(indexToType(i)); + sources[i]->clear(); + } else { + sources[i] = mPacketSources.valueFor(indexToType(i)); + } + } + } + + if (resume) { + ALOGV("resuming fetcher %s, timeUs %lld", uri.c_str(), (long long)timeUs); + SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; + mFetcherInfos.editValueAt(index).mFetcher->startAsync( + sources[kAudioIndex], + sources[kVideoIndex], + sources[kSubtitleIndex], + timeUs, -1, -1, seekMode); + } + + return resume; +} + float LiveSession::getAbortThreshold( ssize_t currentBWIndex, ssize_t targetBWIndex) const { float abortThreshold = -1.0f; @@ -983,12 +1195,17 @@ float LiveSession::getAbortThreshold( X/T < bw1 / (bw1 + bw0 - bw) */ + // Taking the measured current bandwidth at 50% face value only, + // as our bandwidth estimation is a lagging indicator. Being + // conservative on this, we prefer switching to lower bandwidth + // unless we're really confident finishing up the last segment + // of higher bandwidth will be fast. CHECK(mLastBandwidthBps >= 0); abortThreshold = (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth - - (float)mLastBandwidthBps * 0.7f); + - (float)mLastBandwidthBps * 0.5f); if (abortThreshold < 0.0f) { abortThreshold = -1.0f; // do not abort } @@ -1128,7 +1345,7 @@ status_t LiveSession::onSeek(const sp &msg) { CHECK(msg->findInt64("timeUs", &timeUs)); if (!mReconfigurationInProgress) { - changeConfiguration(timeUs, mCurBandwidthIndex); + changeConfiguration(timeUs); return OK; } else { return -EWOULDBLOCK; @@ -1184,7 +1401,6 @@ status_t LiveSession::selectTrack(size_t index, bool select) { status_t err = mPlaylist->selectTrack(index, select); if (err == OK) { sp msg = new AMessage(kWhatChangeConfiguration, this); - msg->setInt32("bandwidthIndex", mCurBandwidthIndex); msg->setInt32("pickTrack", select); msg->post(); } @@ -1200,19 +1416,17 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const { } void LiveSession::changeConfiguration( - int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { - // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. - // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). + int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { cancelBandwidthSwitch(); CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; - - ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", - timeUs, bandwidthIndex, pickTrack); - - CHECK_LT(bandwidthIndex, mBandwidthItems.size()); - const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); + if (bandwidthIndex >= 0) { + mOrigBandwidthIndex = mCurBandwidthIndex; + mCurBandwidthIndex = bandwidthIndex; + } + CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); + const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); uint32_t streamMask = 0; // streams that should be fetched by the new fetcher uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher @@ -1227,6 +1441,12 @@ void LiveSession::changeConfiguration( // Step 1, stop and discard fetchers that are no longer needed. // Pause those that we'll reuse. for (size_t i = 0; i < mFetcherInfos.size(); ++i) { + // skip fetchers that are marked mToBeRemoved, + // these are done and can't be reused + if (mFetcherInfos[i].mToBeRemoved) { + continue; + } + const AString &uri = mFetcherInfos.keyAt(i); bool discardFetcher = true; @@ -1255,7 +1475,7 @@ void LiveSession::changeConfiguration( } else if (!pickTrack) { // adapting, abort if remaining of current segment is over threshold threshold = getAbortThreshold( - mCurBandwidthIndex, bandwidthIndex); + mOrigBandwidthIndex, mCurBandwidthIndex); } ALOGV("Pausing with threshold %.3f", threshold); @@ -1264,8 +1484,6 @@ void LiveSession::changeConfiguration( } } - mCurBandwidthIndex = bandwidthIndex; - sp msg; if (timeUs < 0ll) { // skip onChangeConfiguration2 (decoder destruction) if not seeking. @@ -1297,10 +1515,9 @@ void LiveSession::changeConfiguration( void LiveSession::onChangeConfiguration(const sp &msg) { if (!mReconfigurationInProgress) { - int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; + int32_t pickTrack = 0; msg->findInt32("pickTrack", &pickTrack); - msg->findInt32("bandwidthIndex", &bandwidthIndex); - changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); + changeConfiguration(-1ll /* timeUs */, -1, pickTrack); } else { msg->post(1000000ll); // retry in 1 sec } @@ -1323,6 +1540,10 @@ void LiveSession::onChangeConfiguration2(const sp &msg) { mPacketSources.editValueAt(i)->clear(); } + for (size_t i = 0; i < kMaxStreams; ++i) { + mStreams[i].mCurDiscontinuitySeq = 0; + } + mDiscontinuityOffsetTimesUs.clear(); mDiscontinuityAbsStartTimesUs.clear(); @@ -1333,6 +1554,10 @@ void LiveSession::onChangeConfiguration2(const sp &msg) { mSeekReplyID.clear(); mSeekReply.clear(); } + + // restart buffer polling after seek becauese previous + // buffering position is no longer valid. + restartPollBuffering(); } uint32_t streamMask, resumeMask; @@ -1407,12 +1632,14 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { int64_t timeUs; int32_t pickTrack; bool switching = false; + bool finishSwitching = false; CHECK(msg->findInt64("timeUs", &timeUs)); CHECK(msg->findInt32("pickTrack", &pickTrack)); if (timeUs < 0ll) { if (!pickTrack) { switching = true; + finishSwitching = (streamMask == 0); } mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; } else { @@ -1440,20 +1667,8 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { ALOGV("resuming fetchers for mask 0x%08x", resumeMask); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { const AString &uri = mFetcherInfos.keyAt(i); - - sp sources[kMaxStreams]; - for (size_t j = 0; j < kMaxStreams; ++j) { - if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { - sources[j] = mPacketSources.valueFor(indexToType(j)); - } - } - FetcherInfo &info = mFetcherInfos.editValueAt(i); - if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL - || sources[kSubtitleIndex] != NULL) { - info.mFetcher->startAsync( - sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs); - } else { - info.mToBeRemoved = true; + if (!resumeFetcher(uri, resumeMask, timeUs)) { + mFetcherInfos.editValueAt(i).mToBeRemoved = true; } } @@ -1512,24 +1727,42 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { } else { // adapting meta = sources[j]->getLatestEnqueuedMeta(); + if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { + // switching up + meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); + } } - if (meta != NULL && !meta->findInt32("discontinuity", &type)) { + if (j != kSubtitleIndex + && meta != NULL + && !meta->findInt32("discontinuity", &type)) { int64_t tmpUs; int64_t tmpSegmentUs; + int32_t seq; CHECK(meta->findInt64("timeUs", &tmpUs)); CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs)); - if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) { + CHECK(meta->findInt32("discontinuitySeq", &seq)); + // If we're switching and looking for next sample or segment, set the target + // segment start time to tmpSegmentUs + tmpDurationUs / 2, which is + // the middle point of the segment where the last sample was. + // This is needed if segments of the two variants are not perfectly + // aligned. (If the corresponding segment in new variant starts slightly + // later than that in the old variant, we still want the switching to + // start in the next one, not the current one) + if (mStreams[j].mSeekMode == kSeekModeNextSample + || mStreams[j].mSeekMode == kSeekModeNextSegment) { + int64_t tmpDurationUs; + CHECK(meta->findInt64("segmentDurationUs", &tmpDurationUs)); + tmpSegmentUs += tmpDurationUs / 2; + } + if (startTimeUs < 0 || seq > discontinuitySeq + || (seq == discontinuitySeq + && (tmpSegmentUs > segmentStartTimeUs + || (tmpSegmentUs == segmentStartTimeUs + && tmpUs > startTimeUs)))) { startTimeUs = tmpUs; segmentStartTimeUs = tmpSegmentUs; - } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) { - startTimeUs = tmpUs; - } - - int32_t seq; - CHECK(meta->findInt32("discontinuitySeq", &seq)); - if (discontinuitySeq < 0 || seq < discontinuitySeq) { discontinuitySeq = seq; } } @@ -1585,8 +1818,21 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { mReconfigurationInProgress = false; if (switching) { mSwitchInProgress = true; + + if (finishSwitching) { + // Switch is finished now, no new fetchers are created. + // This path is hit when old variant had video and audio from + // two separate fetchers, while new variant has audio only, + // which reuses the previous audio fetcher. + for (size_t i = 0; i < kMaxStreams; ++i) { + if (mSwapMask & indexToType(i)) { + tryToFinishBandwidthSwitch(mStreams[i].mUri); + } + } + } } else { mStreamMask = mNewStreamMask; + mOrigBandwidthIndex = mCurBandwidthIndex; } if (mDisconnectReplyID != NULL) { @@ -1614,21 +1860,20 @@ void LiveSession::swapPacketSource(StreamType stream) { aps2->clear(); } -void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) { +void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { if (!mSwitchInProgress) { return; } - ssize_t index = mFetcherInfos.indexOfKey(uri); + ssize_t index = mFetcherInfos.indexOfKey(oldUri); if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { return; } // Swap packet source of streams provided by old variant for (size_t idx = 0; idx < kMaxStreams; idx++) { - if (uri == mStreams[idx].mUri) { - StreamType stream = indexToType(idx); - + StreamType stream = indexToType(idx); + if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { swapPacketSource(stream); if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { @@ -1642,7 +1887,7 @@ void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) { } } - mFetcherInfos.editValueAt(index).mToBeRemoved = false; + mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask); if (mSwapMask != 0) { @@ -1672,30 +1917,20 @@ void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) { for (size_t i = 0; i < mFetcherInfos.size(); ++i) { FetcherInfo &info = mFetcherInfos.editValueAt(i); if (info.mToBeResumed) { - const AString &uri = mFetcherInfos.keyAt(i); - sp sources[kMaxStreams]; - for (size_t j = 0; j < kMaxStreams; ++j) { - if (uri == mStreams[j].mUri) { - sources[j] = mPacketSources.valueFor(indexToType(j)); - } - } - if (sources[kAudioIndex] != NULL - || sources[kVideoIndex] != NULL - || sources[kSubtitleIndex] != NULL) { - ALOGV("resuming fetcher %s", uri.c_str()); - info.mFetcher->startAsync( - sources[kAudioIndex], - sources[kVideoIndex], - sources[kSubtitleIndex]); - } + resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); info.mToBeResumed = false; } } + ALOGI("#### Finished Bandwidth Switch: %zd => %zd", + mOrigBandwidthIndex, mCurBandwidthIndex); + mStreamMask = mNewStreamMask; mSwitchInProgress = false; + mOrigBandwidthIndex = mCurBandwidthIndex; - ALOGI("#### Finished Bandwidth Switch"); + + restartPollBuffering(); } void LiveSession::schedulePollBuffering() { @@ -1706,6 +1941,12 @@ void LiveSession::schedulePollBuffering() { void LiveSession::cancelPollBuffering() { ++mPollBufferingGeneration; + mPrevBufferPercentage = -1; +} + +void LiveSession::restartPollBuffering() { + cancelPollBuffering(); + onPollBuffering(); } void LiveSession::onPollBuffering() { @@ -1714,70 +1955,90 @@ void LiveSession::onPollBuffering() { mSwitchInProgress, mReconfigurationInProgress, mInPreparationPhase, mCurBandwidthIndex, mStreamMask); - bool low, mid, high; - if (checkBuffering(low, mid, high)) { - if (mInPreparationPhase && mid) { + bool underflow, ready, down, up; + if (checkBuffering(underflow, ready, down, up)) { + if (mInPreparationPhase && ready) { postPrepared(OK); } // don't switch before we report prepared if (!mInPreparationPhase) { - switchBandwidthIfNeeded(high, !mid); - } + if (ready) { + stopBufferingIfNecessary(); + } else if (underflow) { + startBufferingIfNecessary(); + } + switchBandwidthIfNeeded(up, down); + } + } schedulePollBuffering(); } -void LiveSession::cancelBandwidthSwitch() { - ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++", mSwitchGeneration); - - mSwitchGeneration++; - mSwitchInProgress = false; - mSwapMask = 0; +void LiveSession::cancelBandwidthSwitch(bool resume) { + ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", + mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); + if (!mSwitchInProgress) { + return; + } for (size_t i = 0; i < mFetcherInfos.size(); ++i) { FetcherInfo& info = mFetcherInfos.editValueAt(i); if (info.mToBeRemoved) { info.mToBeRemoved = false; + if (resume) { + resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); + } } } for (size_t i = 0; i < kMaxStreams; ++i) { - if (!mStreams[i].mNewUri.empty()) { - ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); - if (j < 0) { - mStreams[i].mNewUri.clear(); + AString newUri = mStreams[i].mNewUri; + if (!newUri.empty()) { + // clear all mNewUri matching this newUri + for (size_t j = i; j < kMaxStreams; ++j) { + if (mStreams[j].mNewUri == newUri) { + mStreams[j].mNewUri.clear(); + } + } + ALOGV("stopping newUri = %s", newUri.c_str()); + ssize_t index = mFetcherInfos.indexOfKey(newUri); + if (index < 0) { + ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); continue; } - - const FetcherInfo &info = mFetcherInfos.valueAt(j); + FetcherInfo &info = mFetcherInfos.editValueAt(index); + info.mToBeRemoved = true; info.mFetcher->stopAsync(); - mFetcherInfos.removeItemsAt(j); - mStreams[i].mNewUri.clear(); } } + + ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", + mCurBandwidthIndex, mOrigBandwidthIndex); + + mSwitchGeneration++; + mSwitchInProgress = false; + mCurBandwidthIndex = mOrigBandwidthIndex; + mSwapMask = 0; } -bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { - low = mid = high = false; +bool LiveSession::checkBuffering( + bool &underflow, bool &ready, bool &down, bool &up) { + underflow = ready = down = up = false; - if (mSwitchInProgress || mReconfigurationInProgress) { + if (mReconfigurationInProgress) { ALOGV("Switch/Reconfig in progress, defer buffer polling"); return false; } - // TODO: Fine tune low/high mark. - // We also need to pause playback if buffering is too low. - // Currently during underflow, we depend on decoder to starve - // to pause, but A/V could have different buffering left, - // they're not paused together. - // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE - - // Switch down if any of the fetchers are below low mark; - // Switch up if all of the fetchers are over high mark. - size_t activeCount, lowCount, midCount, highCount; - activeCount = lowCount = midCount = highCount = 0; + size_t activeCount, underflowCount, readyCount, downCount, upCount; + activeCount = underflowCount = readyCount = downCount = upCount =0; + int32_t minBufferPercent = -1; + int64_t durationUs; + if (getDuration(&durationUs) != OK) { + durationUs = -1; + } for (size_t i = 0; i < mPacketSources.size(); ++i) { // we don't check subtitles for buffering level if (!(mStreamMask & mPacketSources.keyAt(i) @@ -1791,34 +2052,99 @@ bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { continue; } - ++activeCount; int64_t bufferedDurationUs = mPacketSources[i]->getEstimatedDurationUs(); ALOGV("source[%zu]: buffered %lld us", i, (long long)bufferedDurationUs); - if (bufferedDurationUs < kLowWaterMark) { - ++lowCount; - break; - } else if (bufferedDurationUs > kHighWaterMark) { - ++midCount; - ++highCount; - } else if (bufferedDurationUs > kMidWaterMark) { - ++midCount; + if (durationUs >= 0) { + int32_t percent; + if (mPacketSources[i]->isFinished(0 /* duration */)) { + percent = 100; + } else { + percent = (int32_t)(100.0 * (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); + } + if (minBufferPercent < 0 || percent < minBufferPercent) { + minBufferPercent = percent; + } } + + ++activeCount; + int64_t readyMark = mInPreparationPhase ? kPrepareMark : kReadyMark; + if (bufferedDurationUs > readyMark + || mPacketSources[i]->isFinished(0)) { + ++readyCount; + } + if (!mPacketSources[i]->isFinished(0)) { + if (bufferedDurationUs < kUnderflowMark) { + ++underflowCount; + } + if (bufferedDurationUs > mUpSwitchMark) { + ++upCount; + } else if (bufferedDurationUs < mDownSwitchMark) { + ++downCount; + } + } + } + + if (minBufferPercent >= 0) { + notifyBufferingUpdate(minBufferPercent); } if (activeCount > 0) { - high = (highCount == activeCount); - mid = (midCount == activeCount); - low = (lowCount > 0); + up = (upCount == activeCount); + down = (downCount > 0); + ready = (readyCount == activeCount); + underflow = (underflowCount > 0); return true; } return false; } +void LiveSession::startBufferingIfNecessary() { + ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", + mInPreparationPhase, mBuffering); + if (!mBuffering) { + mBuffering = true; + + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingStart); + notify->post(); + } +} + +void LiveSession::stopBufferingIfNecessary() { + ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", + mInPreparationPhase, mBuffering); + + if (mBuffering) { + mBuffering = false; + + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingEnd); + notify->post(); + } +} + +void LiveSession::notifyBufferingUpdate(int32_t percentage) { + if (percentage < mPrevBufferPercentage) { + percentage = mPrevBufferPercentage; + } else if (percentage > 100) { + percentage = 100; + } + + mPrevBufferPercentage = percentage; + + ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); + + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatBufferingUpdate); + notify->setInt32("percentage", percentage); + notify->post(); +} + void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { // no need to check bandwidth if we only have 1 bandwidth settings - if (mBandwidthItems.size() < 2) { + if (mSwitchInProgress || mBandwidthItems.size() < 2) { return; } @@ -1850,6 +2176,22 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { } } +void LiveSession::postError(status_t err) { + // if we reached EOS, notify buffering of 100% + if (err == ERROR_END_OF_STREAM) { + notifyBufferingUpdate(100); + } + // we'll stop buffer polling now, before that notify + // stop buffering to stop the spinning icon + stopBufferingIfNecessary(); + cancelPollBuffering(); + + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatError); + notify->setInt32("err", err); + notify->post(); +} + void LiveSession::postPrepared(status_t err) { CHECK(mInPreparationPhase); @@ -1857,6 +2199,8 @@ void LiveSession::postPrepared(status_t err) { if (err == OK || err == ERROR_END_OF_STREAM) { notify->setInt32("what", kWhatPrepared); } else { + cancelPollBuffering(); + notify->setInt32("what", kWhatPreparationFailed); notify->setInt32("err", err); } diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h index cbf988e..d11675b 100644 --- a/media/libstagefright/httplive/LiveSession.h +++ b/media/libstagefright/httplive/LiveSession.h @@ -89,11 +89,16 @@ struct LiveSession : public AHandler { bool isSeekable() const; bool hasDynamicDuration() const; + static const char *getKeyForStream(StreamType type); + enum { kWhatStreamsChanged, kWhatError, kWhatPrepared, kWhatPreparationFailed, + kWhatBufferingStart, + kWhatBufferingEnd, + kWhatBufferingUpdate, }; protected: @@ -116,9 +121,15 @@ private: kWhatPollBuffering = 'poll', }; - static const int64_t kHighWaterMark; - static const int64_t kMidWaterMark; - static const int64_t kLowWaterMark; + // Bandwidth Switch Mark Defaults + static const int64_t kUpSwitchMark = 25000000ll; + static const int64_t kDownSwitchMark = 18000000ll; + static const int64_t kUpSwitchMargin = 5000000ll; + + // Buffer Prepare/Ready/Underflow Marks + static const int64_t kReadyMark = 5000000ll; + static const int64_t kPrepareMark = 1500000ll; + static const int64_t kUnderflowMark = 1000000ll; struct BandwidthEstimator; struct BandwidthItem { @@ -160,8 +171,10 @@ private: uint32_t mFlags; sp mHTTPService; + bool mBuffering; bool mInPreparationPhase; - bool mBuffering[kMaxStreams]; + int32_t mPollBufferingGeneration; + int32_t mPrevBufferPercentage; sp mHTTPDataSource; KeyedVector mExtraHeaders; @@ -170,6 +183,7 @@ private: Vector mBandwidthItems; ssize_t mCurBandwidthIndex; + ssize_t mOrigBandwidthIndex; int32_t mLastBandwidthBps; sp mBandwidthEstimator; @@ -204,6 +218,10 @@ private: bool mReconfigurationInProgress; bool mSwitchInProgress; + int64_t mUpSwitchMark; + int64_t mDownSwitchMark; + int64_t mUpSwitchMargin; + sp mDisconnectReplyID; sp mSeekReplyID; @@ -213,8 +231,6 @@ private: KeyedVector mDiscontinuityAbsStartTimesUs; KeyedVector mDiscontinuityOffsetTimesUs; - int32_t mPollBufferingGeneration; - sp addFetcher(const char *uri); void onConnect(const sp &msg); @@ -247,6 +263,10 @@ private: sp fetchPlaylist( const char *url, uint8_t *curPlaylistHash, bool *unchanged); + bool resumeFetcher( + const AString &uri, uint32_t streamMask, + int64_t timeUs = -1ll, bool newUri = false); + float getAbortThreshold( ssize_t currentBWIndex, ssize_t targetBWIndex) const; void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); @@ -258,24 +278,32 @@ private: static ssize_t typeToIndex(int32_t type); void changeConfiguration( - int64_t timeUs, size_t bandwidthIndex, bool pickTrack = false); + int64_t timeUs, ssize_t bwIndex = -1, bool pickTrack = false); void onChangeConfiguration(const sp &msg); void onChangeConfiguration2(const sp &msg); void onChangeConfiguration3(const sp &msg); + void swapPacketSource(StreamType stream); - void tryToFinishBandwidthSwitch(const AString &uri); + void tryToFinishBandwidthSwitch(const AString &oldUri); + void cancelBandwidthSwitch(bool resume = false); + bool checkSwitchProgress( + sp &msg, int64_t delayUs, bool *needResumeUntil); - void cancelBandwidthSwitch(); + void switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow); void schedulePollBuffering(); void cancelPollBuffering(); + void restartPollBuffering(); void onPollBuffering(); - bool checkBuffering(bool &low, bool &mid, bool &high); - void switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow); + bool checkBuffering(bool &underflow, bool &ready, bool &down, bool &up); + void startBufferingIfNecessary(); + void stopBufferingIfNecessary(); + void notifyBufferingUpdate(int32_t percentage); void finishDisconnect(); void postPrepared(status_t err); + void postError(status_t err); DISALLOW_EVIL_CONSTRUCTORS(LiveSession); }; diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp index 68f0357..3ace396 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.cpp +++ b/media/libstagefright/httplive/PlaylistFetcher.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -47,7 +48,7 @@ namespace android { // static -const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; +const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll; const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; const int64_t PlaylistFetcher::kFetcherResumeThreshold = 100000ll; // LCM of 188 (size of a TS packet) & 1k works well @@ -157,8 +158,8 @@ PlaylistFetcher::PlaylistFetcher( mSeqNumber(-1), mNumRetries(0), mStartup(true), + mIDRFound(false), mSeekMode(LiveSession::kSeekModeExactPosition), - mPrepared(false), mTimeChangeSignaled(false), mNextPTSTimeUs(-1ll), mMonitorQueueGeneration(0), @@ -208,6 +209,32 @@ int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { return segmentStartUs; } +int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const { + CHECK(mPlaylist != NULL); + + int32_t firstSeqNumberInPlaylist; + if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( + "media-sequence", &firstSeqNumberInPlaylist)) { + firstSeqNumberInPlaylist = 0; + } + + int32_t lastSeqNumberInPlaylist = + firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; + + CHECK_GE(seqNumber, firstSeqNumberInPlaylist); + CHECK_LE(seqNumber, lastSeqNumberInPlaylist); + + int32_t index = seqNumber - firstSeqNumberInPlaylist; + sp itemMeta; + CHECK(mPlaylist->itemAt( + index, NULL /* uri */, &itemMeta)); + + int64_t itemDurationUs; + CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); + + return itemDurationUs; +} + int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { int64_t nowUs = ALooper::GetNowUs(); @@ -559,7 +586,6 @@ status_t PlaylistFetcher::onStart(const sp &msg) { mStopParams.clear(); mStartTimeUsNotify = mNotify->dup(); mStartTimeUsNotify->setInt32("what", kWhatStartedAt); - mStartTimeUsNotify->setInt32("streamMask", 0); mStartTimeUsNotify->setString("uri", mURI); uint32_t streamTypeMask; @@ -604,20 +630,25 @@ status_t PlaylistFetcher::onStart(const sp &msg) { mStreamTypeMask = streamTypeMask; mSegmentStartTimeUs = segmentStartTimeUs; - mDiscontinuitySeq = startDiscontinuitySeq; + + if (startDiscontinuitySeq >= 0) { + mDiscontinuitySeq = startDiscontinuitySeq; + } mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; mSeekMode = (LiveSession::SeekMode) seekMode; + if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) { + mStartup = true; + mIDRFound = false; + mVideoBuffer->clear(); + } + if (startTimeUs >= 0) { mStartTimeUs = startTimeUs; mFirstPTSValid = false; mSeqNumber = -1; - mStartup = true; - mPrepared = false; - mIDRFound = false; mTimeChangeSignaled = false; - mVideoBuffer->clear(); mDownloadState->resetState(); } @@ -663,8 +694,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp &msg) { for (size_t i = 0; i < mPacketSources.size(); i++) { sp packetSource = mPacketSources.valueAt(i); - const char *stopKey; - int streamType = mPacketSources.keyAt(i); + LiveSession::StreamType streamType = mPacketSources.keyAt(i); if (streamType == LiveSession::STREAMTYPE_SUBTITLES) { // the subtitle track can always be stopped @@ -672,18 +702,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp &msg) { continue; } - switch (streamType) { - case LiveSession::STREAMTYPE_VIDEO: - stopKey = "timeUsVideo"; - break; - - case LiveSession::STREAMTYPE_AUDIO: - stopKey = "timeUsAudio"; - break; - - default: - TRESPASS(); - } + const char *stopKey = LiveSession::getKeyForStream(streamType); // check if this stream has too little data left to be resumed int32_t discontinuitySeq; @@ -701,7 +720,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp &msg) { // Don't resume if all streams are within a resume threshold if (stopCount == mPacketSources.size()) { - stopAsync(/* clear = */ false); + notifyStopReached(); return OK; } @@ -711,6 +730,12 @@ status_t PlaylistFetcher::onResumeUntil(const sp &msg) { return OK; } +void PlaylistFetcher::notifyStopReached() { + sp notify = mNotify->dup(); + notify->setInt32("what", kWhatStopReached); + notify->post(); +} + void PlaylistFetcher::notifyError(status_t err) { sp notify = mNotify->dup(); notify->setInt32("what", kWhatError); @@ -730,7 +755,12 @@ void PlaylistFetcher::queueDiscontinuity( void PlaylistFetcher::onMonitorQueue() { bool downloadMore = false; - refreshPlaylist(); + + // in the middle of an unfinished download, delay + // playlist refresh as it'll change seq numbers + if (!mDownloadState->hasSavedState()) { + refreshPlaylist(); + } int32_t targetDurationSecs; int64_t targetDurationUs = kMinBufferedDurationUs; @@ -826,6 +856,13 @@ status_t PlaylistFetcher::refreshPlaylist() { if (mPlaylist->isComplete() || mPlaylist->isEvent()) { updateDuration(); } + // Notify LiveSession to use target-duration based buffering level + // for up/down switch. Default LiveSession::kUpSwitchMark may not + // be reachable for live streams, as our max buffering amount is + // limited to 3 segments. + if (!mPlaylist->isComplete()) { + updateTargetDuration(); + } } mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); @@ -838,17 +875,12 @@ bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp& buffer) { return buffer->size() > 0 && buffer->data()[0] == 0x47; } -bool PlaylistFetcher::shouldPauseDownload(bool startFound) { +bool PlaylistFetcher::shouldPauseDownload() { if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { // doesn't apply to subtitles return false; } - // If we're switching, save state and pause after start point is found - if (mSeekMode != LiveSession::kSeekModeExactPosition && startFound) { - return true; - } - // Calculate threshold to abort current download int32_t targetDurationSecs; CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); @@ -1020,7 +1052,7 @@ bool PlaylistFetcher::initDownloadState( // but since the segments we are supposed to fetch have already rolled off // the playlist, i.e. we have already missed the boat, we inevitably have to // skip. - stopAsync(/* clear = */ false); + notifyStopReached(); return false; } mSeqNumber = lastSeqNumberInPlaylist - 3; @@ -1031,12 +1063,21 @@ bool PlaylistFetcher::initDownloadState( // fall through } else { - ALOGE("Cannot find sequence number %d in playlist " - "(contains %d - %d)", - mSeqNumber, firstSeqNumberInPlaylist, - firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); + if (mPlaylist != NULL) { + ALOGE("Cannot find sequence number %d in playlist " + "(contains %d - %d)", + mSeqNumber, firstSeqNumberInPlaylist, + firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); + + notifyError(ERROR_END_OF_STREAM); + } else { + // It's possible that we were never able to download the playlist. + // In this case we should notify error, instead of EOS, as EOS during + // prepare means we succeeded in downloading everything. + ALOGE("Failed to download playlist!"); + notifyError(ERROR_IO); + } - notifyError(ERROR_END_OF_STREAM); return false; } } @@ -1102,8 +1143,10 @@ bool PlaylistFetcher::initDownloadState( // Signal a format discontinuity to ATSParser to clear partial data // from previous streams. Not doing this causes bitstream corruption. - mTSParser->signalDiscontinuity( - ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */); + if (mTSParser != NULL) { + mTSParser->signalDiscontinuity( + ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */); + } queueDiscontinuity( ATSParser::DISCONTINUITY_FORMATCHANGE, @@ -1166,6 +1209,7 @@ void PlaylistFetcher::onDownloadNext() { } // block-wise download + bool shouldPause = false; ssize_t bytesRead; do { sp source = mHTTPDataSource; @@ -1238,21 +1282,34 @@ void PlaylistFetcher::onDownloadNext() { return; } else if (err == ERROR_OUT_OF_RANGE) { // reached stopping point - stopAsync(/* clear = */ false); + notifyStopReached(); return; } else if (err != OK) { notifyError(err); return; - } else if (bytesRead != 0 && - shouldPauseDownload(mStartup != startUp /* startFound */)) { - mDownloadState->saveState( - uri, - itemMeta, - buffer, - tsBuffer, - firstSeqNumberInPlaylist, - lastSeqNumberInPlaylist); - return; + } + // If we're switching, post start notification + // this should only be posted when the last chunk is full processed by TSParser + if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) { + CHECK(mStartTimeUsNotify != NULL); + mStartTimeUsNotify->post(); + mStartTimeUsNotify.clear(); + shouldPause = true; + } + if (shouldPause || shouldPauseDownload()) { + // save state and return if this is not the last chunk, + // leaving the fetcher in paused state. + if (bytesRead != 0) { + mDownloadState->saveState( + uri, + itemMeta, + buffer, + tsBuffer, + firstSeqNumberInPlaylist, + lastSeqNumberInPlaylist); + return; + } + shouldPause = true; } } while (bytesRead != 0); @@ -1290,7 +1347,6 @@ void PlaylistFetcher::onDownloadNext() { return; } - status_t err = OK; if (tsBuffer != NULL) { AString method; CHECK(buffer->meta()->findString("cipher-method", &method)); @@ -1304,30 +1360,40 @@ void PlaylistFetcher::onDownloadNext() { } // bulk extract non-ts files + bool startUp = mStartup; if (tsBuffer == NULL) { - err = extractAndQueueAccessUnits(buffer, itemMeta); + status_t err = extractAndQueueAccessUnits(buffer, itemMeta); if (err == -EAGAIN) { // starting sequence number too low/high postMonitorQueue(); return; } else if (err == ERROR_OUT_OF_RANGE) { // reached stopping point - stopAsync(/* clear = */false); + notifyStopReached(); + return; + } else if (err != OK) { + notifyError(err); return; } } - if (err != OK) { - notifyError(err); - return; + ++mSeqNumber; + + // if adapting, pause after found the next starting point + if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) { + CHECK(mStartTimeUsNotify != NULL); + mStartTimeUsNotify->post(); + mStartTimeUsNotify.clear(); + shouldPause = true; } - ++mSeqNumber; - postMonitorQueue(); + if (!shouldPause) { + postMonitorQueue(); + } } int32_t PlaylistFetcher::getSeqNumberWithAnchorTime( - int64_t anchorTimeUs, int64_t targetDurationUs) const { + int64_t anchorTimeUs, int64_t targetDiffUs) const { int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { @@ -1336,8 +1402,8 @@ int32_t PlaylistFetcher::getSeqNumberWithAnchorTime( lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1; int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1; - // adjust anchorTimeUs to within 1x targetDurationUs from mStartTimeUs - while (index >= 0 && anchorTimeUs - mStartTimeUs > targetDurationUs) { + // adjust anchorTimeUs to within targetDiffUs from mStartTimeUs + while (index >= 0 && anchorTimeUs - mStartTimeUs > targetDiffUs) { sp itemMeta; CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); @@ -1439,6 +1505,7 @@ const sp &PlaylistFetcher::setAccessUnitProperties( accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); + accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber)); return accessUnit; } @@ -1477,30 +1544,15 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu for (size_t i = mPacketSources.size(); i-- > 0;) { sp packetSource = mPacketSources.valueAt(i); - const char *key; - ATSParser::SourceType type; const LiveSession::StreamType stream = mPacketSources.keyAt(i); - switch (stream) { - case LiveSession::STREAMTYPE_VIDEO: - type = ATSParser::VIDEO; - key = "timeUsVideo"; - break; - - case LiveSession::STREAMTYPE_AUDIO: - type = ATSParser::AUDIO; - key = "timeUsAudio"; - break; - - case LiveSession::STREAMTYPE_SUBTITLES: - { - ALOGE("MPEG2 Transport streams do not contain subtitles."); - return ERROR_MALFORMED; - break; - } - - default: - TRESPASS(); + if (stream == LiveSession::STREAMTYPE_SUBTITLES) { + ALOGE("MPEG2 Transport streams do not contain subtitles."); + return ERROR_MALFORMED; } + const char *key = LiveSession::getKeyForStream(stream); + ATSParser::SourceType type = + (stream == LiveSession::STREAMTYPE_AUDIO) ? + ATSParser::AUDIO : ATSParser::VIDEO; sp source = static_cast( @@ -1523,8 +1575,58 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu int64_t timeUs; CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); + bool seeking = mSeekMode == LiveSession::kSeekModeExactPosition; if (mSegmentFirstPTS < 0ll) { mSegmentFirstPTS = timeUs; + if (!seeking) { + int32_t firstSeqNumberInPlaylist; + if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( + "media-sequence", &firstSeqNumberInPlaylist)) { + firstSeqNumberInPlaylist = 0; + } + + int32_t targetDurationSecs; + CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); + int64_t targetDurationUs = targetDurationSecs * 1000000ll; + // mStartup + // mStartup is true until we have queued a packet for all the streams + // we are fetching. We queue packets whose timestamps are greater than + // mStartTimeUs. + // mSegmentStartTimeUs >= 0 + // mSegmentStartTimeUs is non-negative when adapting or switching tracks + // mSeqNumber > firstSeqNumberInPlaylist + // don't decrement mSeqNumber if it already points to the 1st segment + // timeUs - mStartTimeUs > targetDurationUs: + // This and the 2 above conditions should only happen when adapting in a live + // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher + // would start fetching after timeUs, which should be greater than mStartTimeUs; + // the old fetcher would then continue fetching data until timeUs. We don't want + // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to + // stop as early as possible. The definition of being "too far ahead" is + // arbitrary; here we use targetDurationUs as threshold. + int64_t targetDiffUs =(mSeekMode == LiveSession::kSeekModeNextSample + ? 0 : targetDurationUs); + if (mStartup && mSegmentStartTimeUs >= 0 + && mSeqNumber > firstSeqNumberInPlaylist + && timeUs - mStartTimeUs > targetDiffUs) { + // we just guessed a starting timestamp that is too high when adapting in a + // live stream; re-adjust based on the actual timestamp extracted from the + // media segment; if we didn't move backward after the re-adjustment + // (newSeqNumber), start at least 1 segment prior. + int32_t newSeqNumber = getSeqNumberWithAnchorTime( + timeUs, targetDiffUs); + if (newSeqNumber >= mSeqNumber) { + --mSeqNumber; + } else { + mSeqNumber = newSeqNumber; + } + mStartTimeUsNotify = mNotify->dup(); + mStartTimeUsNotify->setInt32("what", kWhatStartedAt); + mStartTimeUsNotify->setString("uri", mURI); + mIDRFound = false; + return -EAGAIN; + } + } } if (mStartup) { if (!mFirstPTSValid) { @@ -1538,10 +1640,8 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu } } - bool seeking = mSeekMode == LiveSession::kSeekModeExactPosition; bool startTimeReached = - seeking ? (timeUs >= mStartTimeUs) - : (timeUs > mStartTimeUs); + seeking ? (timeUs >= mStartTimeUs) : true; if (!startTimeReached || (isAvc && !mIDRFound)) { // buffer up to the closest preceding IDR frame in the next segement, @@ -1562,51 +1662,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu } if (mStartTimeUsNotify != NULL) { - int32_t firstSeqNumberInPlaylist; - if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( - "media-sequence", &firstSeqNumberInPlaylist)) { - firstSeqNumberInPlaylist = 0; - } - - int32_t targetDurationSecs; - CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); - int64_t targetDurationUs = targetDurationSecs * 1000000ll; - // mStartup - // mStartup is true until we have queued a packet for all the streams - // we are fetching. We queue packets whose timestamps are greater than - // mStartTimeUs. - // mSegmentStartTimeUs >= 0 - // mSegmentStartTimeUs is non-negative when adapting or switching tracks - // mSeqNumber > firstSeqNumberInPlaylist - // don't decrement mSeqNumber if it already points to the 1st segment - // timeUs - mStartTimeUs > targetDurationUs: - // This and the 2 above conditions should only happen when adapting in a live - // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher - // would start fetching after timeUs, which should be greater than mStartTimeUs; - // the old fetcher would then continue fetching data until timeUs. We don't want - // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to - // stop as early as possible. The definition of being "too far ahead" is - // arbitrary; here we use targetDurationUs as threshold. - if (mStartup && mSegmentStartTimeUs >= 0 - && mSeqNumber > firstSeqNumberInPlaylist - && timeUs - mStartTimeUs > targetDurationUs) { - // we just guessed a starting timestamp that is too high when adapting in a - // live stream; re-adjust based on the actual timestamp extracted from the - // media segment; if we didn't move backward after the re-adjustment - // (newSeqNumber), start at least 1 segment prior. - int32_t newSeqNumber = getSeqNumberWithAnchorTime( - timeUs, targetDurationUs); - if (newSeqNumber >= mSeqNumber) { - --mSeqNumber; - } else { - mSeqNumber = newSeqNumber; - } - mStartTimeUsNotify = mNotify->dup(); - mStartTimeUsNotify->setInt32("what", kWhatStartedAt); - mIDRFound = false; - return -EAGAIN; - } - int32_t seq; if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) { mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); @@ -1622,12 +1677,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu if (streamMask == mStreamTypeMask) { mStartup = false; - // only need to post if we're switching and searching for a - // start point in next segment, or next IDR - if (mSeekMode != LiveSession::kSeekModeExactPosition) { - mStartTimeUsNotify->post(); - } - mStartTimeUsNotify.clear(); } } } @@ -1887,11 +1936,13 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); int64_t targetDurationUs = targetDurationSecs * 1000000ll; + int64_t targetDiffUs =(mSeekMode == LiveSession::kSeekModeNextSample + ? 0 : targetDurationUs); // Duplicated logic from how we handle .ts playlists. if (mStartup && mSegmentStartTimeUs >= 0 - && timeUs - mStartTimeUs > targetDurationUs) { + && timeUs - mStartTimeUs > targetDiffUs) { int32_t newSeqNumber = getSeqNumberWithAnchorTime( - timeUs, targetDurationUs); + timeUs, targetDiffUs); if (newSeqNumber >= mSeqNumber) { --mSeqNumber; } else { @@ -1903,8 +1954,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( mStartTimeUsNotify->setInt64("timeUsAudio", timeUs); mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); - mStartTimeUsNotify->post(); - mStartTimeUsNotify.clear(); mStartup = false; } } @@ -1953,4 +2002,15 @@ void PlaylistFetcher::updateDuration() { msg->post(); } +void PlaylistFetcher::updateTargetDuration() { + int32_t targetDurationSecs; + CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); + int64_t targetDurationUs = targetDurationSecs * 1000000ll; + + sp msg = mNotify->dup(); + msg->setInt32("what", kWhatTargetDurationUpdate); + msg->setInt64("targetDurationUs", targetDurationUs); + msg->post(); +} + } // namespace android diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h index 8d34cbc..dab56df 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.h +++ b/media/libstagefright/httplive/PlaylistFetcher.h @@ -44,9 +44,11 @@ struct PlaylistFetcher : public AHandler { kWhatStopped, kWhatError, kWhatDurationUpdate, + kWhatTargetDurationUpdate, kWhatPrepared, kWhatPreparationFailed, kWhatStartedAt, + kWhatStopReached, }; PlaylistFetcher( @@ -64,7 +66,7 @@ struct PlaylistFetcher : public AHandler { int64_t startTimeUs = -1ll, // starting timestamps int64_t segmentStartTimeUs = -1ll, // starting position within playlist // startTimeUs!=segmentStartTimeUs only when playlist is live - int32_t startDiscontinuitySeq = 0, + int32_t startDiscontinuitySeq = -1, LiveSession::SeekMode seekMode = LiveSession::kSeekModeExactPosition); void pauseAsync(float thresholdRatio); @@ -135,7 +137,6 @@ private: bool mStartup; bool mIDRFound; int32_t mSeekMode; - bool mPrepared; bool mTimeChangeSignaled; int64_t mNextPTSTimeUs; @@ -187,7 +188,7 @@ private: void postMonitorQueue(int64_t delayUs = 0, int64_t minDelayUs = 0); void cancelMonitorQueue(); void setStoppingThreshold(float thresholdRatio); - bool shouldPauseDownload(bool startFound); + bool shouldPauseDownload(); int64_t delayUsToRefreshPlaylist() const; status_t refreshPlaylist(); @@ -195,6 +196,8 @@ private: // Returns the media time in us of the segment specified by seqNumber. // This is computed by summing the durations of all segments before it. int64_t getSegmentStartTimeUs(int32_t seqNumber) const; + // Returns the duration time in us of the segment specified. + int64_t getSegmentDurationUs(int32_t seqNumber) const; status_t onStart(const sp &msg); void onPause(); @@ -219,6 +222,7 @@ private: status_t extractAndQueueAccessUnits( const sp &buffer, const sp &itemMeta); + void notifyStopReached(); void notifyError(status_t err); void queueDiscontinuity( @@ -230,6 +234,7 @@ private: int32_t getSeqNumberForTime(int64_t timeUs) const; void updateDuration(); + void updateTargetDuration(); DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher); }; diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp index 9f42217..c2f1527 100644 --- a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp +++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp @@ -19,6 +19,8 @@ #include "AnotherPacketSource.h" +#include "include/avc_utils.h" + #include #include #include @@ -38,6 +40,7 @@ const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs AnotherPacketSource::AnotherPacketSource(const sp &meta) : mIsAudio(false), mIsVideo(false), + mEnabled(true), mFormat(NULL), mLastQueuedTimeUs(0), mEOSResult(OK), @@ -155,7 +158,6 @@ status_t AnotherPacketSource::read( const sp buffer = *mBuffers.begin(); mBuffers.erase(mBuffers.begin()); - mLatestDequeuedMeta = buffer->meta()->dup(); int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { @@ -166,6 +168,8 @@ status_t AnotherPacketSource::read( return INFO_DISCONTINUITY; } + mLatestDequeuedMeta = buffer->meta()->dup(); + sp object; if (buffer->meta()->findObject("format", &object)) { setFormat(static_cast(object.get())); @@ -205,20 +209,26 @@ void AnotherPacketSource::queueAccessUnit(const sp &buffer) { return; } - int64_t lastQueuedTimeUs; - CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs)); - mLastQueuedTimeUs = lastQueuedTimeUs; - ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6); - Mutex::Autolock autoLock(mLock); mBuffers.push_back(buffer); mCondition.signal(); int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { + // discontinuity handling needs to be consistent with queueDiscontinuity() ++mQueuedDiscontinuityCount; + mLastQueuedTimeUs = 0ll; + mEOSResult = OK; + mLatestEnqueuedMeta = NULL; + return; } + int64_t lastQueuedTimeUs; + CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs)); + mLastQueuedTimeUs = lastQueuedTimeUs; + ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", + mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6); + if (mLatestEnqueuedMeta == NULL) { mLatestEnqueuedMeta = buffer->meta()->dup(); } else { @@ -299,6 +309,9 @@ void AnotherPacketSource::signalEOS(status_t result) { bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) { Mutex::Autolock autoLock(mLock); *finalResult = OK; + if (!mEnabled) { + return false; + } if (!mBuffers.empty()) { return true; } @@ -310,6 +323,9 @@ bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) { bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) { Mutex::Autolock autoLock(mLock); *finalResult = OK; + if (!mEnabled) { + return false; + } List >::iterator it; for (it = mBuffers.begin(); it != mBuffers.end(); it++) { int32_t discontinuity; @@ -440,4 +456,140 @@ sp AnotherPacketSource::getLatestDequeuedMeta() { return mLatestDequeuedMeta; } +void AnotherPacketSource::enable(bool enable) { + Mutex::Autolock autoLock(mLock); + mEnabled = enable; +} + +sp AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) { + Mutex::Autolock autoLock(mLock); + int64_t firstUs = -1; + int64_t lastUs = -1; + int64_t durationUs = 0; + + List >::iterator it; + for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { + const sp &buffer = *it; + int32_t discontinuity; + if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { + durationUs += lastUs - firstUs; + firstUs = -1; + lastUs = -1; + continue; + } + int64_t timeUs; + if (buffer->meta()->findInt64("timeUs", &timeUs)) { + if (firstUs < 0) { + firstUs = timeUs; + } + if (lastUs < 0 || timeUs > lastUs) { + lastUs = timeUs; + } + if (durationUs + (lastUs - firstUs) >= delayUs) { + return buffer->meta(); + } + } + } + return mLatestEnqueuedMeta; +} + +void AnotherPacketSource::trimBuffersAfterTimeUs( + size_t discontinuitySeq, int64_t timeUs) { + ALOGV("trimBuffersAfterTimeUs: discontinuitySeq %zu, timeUs %lld", + discontinuitySeq, (long long)timeUs); + + Mutex::Autolock autoLock(mLock); + if (mBuffers.empty()) { + return; + } + + List >::iterator it; + sp newLatestEnqueuedMeta = NULL; + int64_t newLastQueuedTimeUs = 0; + size_t newDiscontinuityCount = 0; + for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { + const sp &buffer = *it; + int32_t discontinuity; + if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { + newDiscontinuityCount++; + continue; + } + size_t curDiscontinuitySeq; + int64_t curTimeUs; + CHECK(buffer->meta()->findInt32( + "discontinuitySeq", (int32_t*)&curDiscontinuitySeq)); + CHECK(buffer->meta()->findInt64("timeUs", &curTimeUs)); + if ((curDiscontinuitySeq > discontinuitySeq + || (curDiscontinuitySeq == discontinuitySeq + && curTimeUs >= timeUs))) { + ALOGI("trimming from %lld (inclusive) to end", + (long long)curTimeUs); + break; + } + newLatestEnqueuedMeta = buffer->meta(); + newLastQueuedTimeUs = curTimeUs; + } + mBuffers.erase(it, mBuffers.end()); + mLatestEnqueuedMeta = newLatestEnqueuedMeta; + mLastQueuedTimeUs = newLastQueuedTimeUs; + mQueuedDiscontinuityCount = newDiscontinuityCount; +} + +sp AnotherPacketSource::trimBuffersBeforeTimeUs( + size_t discontinuitySeq, int64_t timeUs) { + ALOGV("trimBuffersBeforeTimeUs: discontinuitySeq %zu, timeUs %lld", + discontinuitySeq, (long long)timeUs); + sp meta; + Mutex::Autolock autoLock(mLock); + if (mBuffers.empty()) { + return NULL; + } + + sp format; + bool isAvc = false; + + List >::iterator it; + size_t discontinuityCount = 0; + for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { + const sp &buffer = *it; + int32_t discontinuity; + if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { + format = NULL; + isAvc = false; + discontinuityCount++; + continue; + } + if (format == NULL) { + sp object; + if (buffer->meta()->findObject("format", &object)) { + const char* mime; + format = static_cast(object.get()); + isAvc = format != NULL + && format->findCString(kKeyMIMEType, &mime) + && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC); + } + } + if (isAvc && !IsIDR(buffer)) { + continue; + } + size_t curDiscontinuitySeq; + int64_t curTimeUs; + CHECK(buffer->meta()->findInt32( + "discontinuitySeq", (int32_t*)&curDiscontinuitySeq)); + CHECK(buffer->meta()->findInt64("timeUs", &curTimeUs)); + if ((curDiscontinuitySeq > discontinuitySeq + || (curDiscontinuitySeq == discontinuitySeq + && curTimeUs > timeUs))) { + ALOGI("trimming from beginning to %lld (not inclusive)", + (long long)curTimeUs); + meta = buffer->meta(); + break; + } + } + mBuffers.erase(mBuffers.begin(), it); + mQueuedDiscontinuityCount -= discontinuityCount; + mLatestDequeuedMeta = NULL; + return meta; +} + } // namespace android diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.h b/media/libstagefright/mpeg2ts/AnotherPacketSource.h index d4fde7c..e126006 100644 --- a/media/libstagefright/mpeg2ts/AnotherPacketSource.h +++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.h @@ -70,8 +70,14 @@ struct AnotherPacketSource : public MediaSource { bool isFinished(int64_t duration) const; + void enable(bool enable); + sp getLatestEnqueuedMeta(); sp getLatestDequeuedMeta(); + sp getMetaAfterLastDequeued(int64_t delayUs); + + void trimBuffersAfterTimeUs(size_t discontinuitySeq, int64_t timeUs); + sp trimBuffersBeforeTimeUs(size_t discontinuitySeq, int64_t timeUs); protected: virtual ~AnotherPacketSource(); @@ -82,6 +88,7 @@ private: bool mIsAudio; bool mIsVideo; + bool mEnabled; sp mFormat; int64_t mLastQueuedTimeUs; List > mBuffers; -- cgit v1.1