From a48d372833ccec13c96ece9efcc226e8beac7f59 Mon Sep 17 00:00:00 2001 From: Chong Zhang Date: Wed, 18 Mar 2015 10:31:13 -0700 Subject: HLS: allow pause/resume in the middle of a segment - when down switching, decide whether to finish current segment based on bandwidth settings, abort current segment if needed. - when switching, pause new fetcher after the first 47K chunk, and go back to resume old fethcer to stop point immediately. - when old fetcher reaches stop point, swap packet sources and resume new fetcher. - mark switching as done as soon as old fecther reaches stop point. This allows us to resume bandwidth monitoring earlier, and do subsequent switches sooner. bug: 19567254 Change-Id: Iba4b5fb9b06541bb1e49592536648f5d4cbc69ab --- media/libstagefright/httplive/LiveSession.cpp | 412 ++++++++++++--------- media/libstagefright/httplive/LiveSession.h | 44 +-- media/libstagefright/httplive/PlaylistFetcher.cpp | 354 ++++++++++++++---- media/libstagefright/httplive/PlaylistFetcher.h | 30 +- media/libstagefright/mpeg2ts/ATSParser.h | 3 + .../libstagefright/mpeg2ts/AnotherPacketSource.cpp | 16 + media/libstagefright/mpeg2ts/AnotherPacketSource.h | 4 + 7 files changed, 586 insertions(+), 277 deletions(-) (limited to 'media') diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index f5328a6..738f8b6 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -128,6 +128,7 @@ LiveSession::LiveSession( mInPreparationPhase(true), mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), mCurBandwidthIndex(-1), + mLastBandwidthBps(-1ll), mBandwidthEstimator(new BandwidthEstimator()), mStreamMask(0), mNewStreamMask(0), @@ -159,24 +160,6 @@ LiveSession::~LiveSession() { } } -sp LiveSession::createFormatChangeBuffer(bool swap) { - ABuffer *discontinuity = new ABuffer(0); - discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); - discontinuity->meta()->setInt32("swapPacketSource", swap); - discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); - discontinuity->meta()->setInt64("timeUs", -1); - return discontinuity; -} - -void LiveSession::swapPacketSource(StreamType stream) { - sp &aps = mPacketSources.editValueFor(stream); - sp &aps2 = mPacketSources2.editValueFor(stream); - sp tmp = aps; - aps = aps2; - aps2 = tmp; - aps2->clear(); -} - status_t LiveSession::dequeueAccessUnit( StreamType stream, sp *accessUnit) { if (!(mStreamMask & stream)) { @@ -189,7 +172,13 @@ status_t LiveSession::dequeueAccessUnit( sp packetSource = mPacketSources.valueFor(stream); ssize_t idx = typeToIndex(stream); - if (!packetSource->hasBufferAvailable(&finalResult)) { + // Do not let client pull data if we don't have data packets yet. + // We might only have a format discontinuity queued without data. + // When NuPlayerDecoder dequeues the format discontinuity, it will + // immediately try to getFormat. If we return NULL, NuPlayerDecoder + // thinks it can do seamless change, so will not shutdown decoder. + // When the actual format arrives, it can't handle it and get stuck. + if (!packetSource->hasDataBufferAvailable(&finalResult)) { if (finalResult == OK) { return -EAGAIN; } else { @@ -197,49 +186,8 @@ status_t LiveSession::dequeueAccessUnit( } } - // Do not let client pull data if we don't have format yet. - // We might only have a format discontinuity queued without actual data. - // When NuPlayerDecoder dequeues the format discontinuity, it will - // immediately try to getFormat. If we return NULL, NuPlayerDecoder - // thinks it can do seamless change, so will not shutdown decoder. - // When the actual format arrives, it can't handle it and get stuck. - // TODO: We need a method to check if the packet source has any - // data packets available, dequeuing should only start then. - sp format = packetSource->getFormat(); - if (format == NULL) { - return -EAGAIN; - } - int32_t targetDuration = 0; - sp meta = packetSource->getLatestEnqueuedMeta(); - if (meta != NULL) { - meta->findInt32("targetDuration", &targetDuration); - } - - int64_t targetDurationUs = targetDuration * 1000000ll; - if (targetDurationUs == 0 || - targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) { - // Fetchers limit buffering to - // min(3 * targetDuration, kMinBufferedDurationUs) - targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs; - } - - // wait for counterpart - sp otherSource; - uint32_t mask = mNewStreamMask & mStreamMask; - uint32_t fetchersMask = 0; - for (size_t i = 0; i < mFetcherInfos.size(); ++i) { - uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); - fetchersMask |= fetcherMask; - } - mask &= fetchersMask; - if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { - otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); - } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { - otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); - } - if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { - return finalResult == OK ? -EAGAIN : finalResult; - } + // Let the client dequeue as long as we have buffers available + // Do not make pause/resume decisions here. status_t err = packetSource->dequeueAccessUnit(accessUnit); @@ -278,41 +226,25 @@ status_t LiveSession::dequeueAccessUnit( type, extra == NULL ? "NULL" : extra->debugString().c_str()); - int32_t swap; - if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { - int32_t switchGeneration; - CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); - { - Mutex::Autolock lock(mSwapMutex); - if (switchGeneration == mSwitchGeneration) { - swapPacketSource(stream); - sp msg = new AMessage(kWhatSwapped, this); - msg->setInt32("stream", stream); - msg->setInt32("switchGeneration", switchGeneration); - msg->post(); - } - } + size_t seq = strm.mCurDiscontinuitySeq; + int64_t offsetTimeUs; + if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { + offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); } else { - size_t seq = strm.mCurDiscontinuitySeq; - int64_t offsetTimeUs; - if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { - offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); - } else { - offsetTimeUs = 0; - } - - seq += 1; - if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { - int64_t firstTimeUs; - firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); - offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; - offsetTimeUs += strm.mLastSampleDurationUs; - } else { - offsetTimeUs += strm.mLastSampleDurationUs; - } + offsetTimeUs = 0; + } - mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); + seq += 1; + if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + int64_t firstTimeUs; + firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); + offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; + offsetTimeUs += strm.mLastSampleDurationUs; + } else { + offsetTimeUs += strm.mLastSampleDurationUs; } + + mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { @@ -373,7 +305,6 @@ status_t LiveSession::dequeueAccessUnit( } status_t LiveSession::getStreamFormat(StreamType stream, sp *format) { - // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; } @@ -389,6 +320,10 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp *format) { return convertMetaDataToMessage(meta, format); } +sp LiveSession::getHTTPDataSource() { + return new MediaHTTP(mHTTPService->makeHTTPConnection()); +} + void LiveSession::connectAsync( const char *url, const KeyedVector *headers) { sp msg = new AMessage(kWhatConnect, this); @@ -468,21 +403,27 @@ void LiveSession::onMessageReceived(const sp &msg) { case PlaylistFetcher::kWhatPaused: case PlaylistFetcher::kWhatStopped: { + AString uri; + CHECK(msg->findString("uri", &uri)); + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0) { + // ignore msgs from fetchers that's already gone + break; + } + if (what == PlaylistFetcher::kWhatStopped) { - AString uri; - CHECK(msg->findString("uri", &uri)); - ssize_t index = mFetcherInfos.indexOfKey(uri); - if (index < 0) { - // ignore duplicated kWhatStopped messages. - break; - } + tryToFinishBandwidthSwitch(uri); mFetcherLooper->unregisterHandler( mFetcherInfos[index].mFetcher->id()); mFetcherInfos.removeItemsAt(index); - - if (mSwitchInProgress) { - tryToFinishBandwidthSwitch(); + } else if (what == PlaylistFetcher::kWhatPaused) { + int32_t seekMode; + CHECK(msg->findInt32("seekMode", &seekMode)); + for (size_t i = 0; i < kMaxStreams; ++i) { + if (mStreams[i].mUri == uri) { + mStreams[i].mSeekMode = (SeekMode) seekMode; + } } } @@ -564,6 +505,13 @@ void LiveSession::onMessageReceived(const sp &msg) { break; } + AString uri; + CHECK(msg->findString("uri", &uri)); + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index >= 0) { + mFetcherInfos.editValueAt(index).mToBeResumed = true; + } + // Resume fetcher for the original variant; the resumed fetcher should // continue until the timestamps found in msg, which is stored by the // new fetcher to indicate where the new variant has started buffering. @@ -607,12 +555,6 @@ void LiveSession::onMessageReceived(const sp &msg) { break; } - case kWhatSwapped: - { - onSwapped(msg); - break; - } - case kWhatPollBuffering: { int32_t generation; @@ -751,11 +693,10 @@ void LiveSession::onConnect(const sp &msg) { } void LiveSession::finishDisconnect() { + ALOGV("finishDisconnect"); + // No reconfiguration is currently pending, make sure none will trigger // during disconnection either. - - // Protect mPacketSources from a swapPacketSource race condition through disconnect. - // (finishDisconnect, onFinishDisconnect2) cancelBandwidthSwitch(); // cancel buffer polling @@ -805,8 +746,8 @@ sp LiveSession::addFetcher(const char *uri) { FetcherInfo info; info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); info.mDurationUs = -1ll; - info.mIsPrepared = false; info.mToBeRemoved = false; + info.mToBeResumed = false; mFetcherLooper->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); @@ -835,14 +776,15 @@ ssize_t LiveSession::fetchFile( int64_t range_offset, int64_t range_length, uint32_t block_size, /* download block size */ sp *source, /* to return and reuse source */ - String8 *actualUrl) { + String8 *actualUrl, + bool forceConnectHTTP /* force connect HTTP when resuing source */) { off64_t size; sp temp_source; if (source == NULL) { source = &temp_source; } - if (*source == NULL) { + if (*source == NULL || forceConnectHTTP) { if (!strncasecmp(url, "file://", 7)) { *source = new FileSource(url + 7); } else if (strncasecmp(url, "http://", 7) @@ -861,13 +803,18 @@ ssize_t LiveSession::fetchFile( ? "" : AStringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str())); } - status_t err = mHTTPDataSource->connect(url, &headers); + + HTTPBase* httpDataSource = + (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get(); + status_t err = httpDataSource->connect(url, &headers); if (err != OK) { return err; } - *source = mHTTPDataSource; + if (*source == NULL) { + *source = mHTTPDataSource; + } } } @@ -1003,6 +950,57 @@ static double uniformRand() { } #endif +float LiveSession::getAbortThreshold( + ssize_t currentBWIndex, ssize_t targetBWIndex) const { + float abortThreshold = -1.0f; + if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { + /* + If we're switching down, we need to decide whether to + + 1) finish last segment of high-bandwidth variant, or + 2) abort last segment of high-bandwidth variant, and fetch an + overlapping portion from low-bandwidth variant. + + Here we try to maximize the amount of buffer left when the + switch point is met. Given the following parameters: + + B: our current buffering level in seconds + T: target duration in seconds + X: sample duration in seconds remain to fetch in last segment + bw0: bandwidth of old variant (as specified in playlist) + bw1: bandwidth of new variant (as specified in playlist) + bw: measured bandwidth available + + If we choose 1), when switch happens at the end of current + segment, our buffering will be + B + X - X * bw0 / bw + + If we choose 2), when switch happens where we aborted current + segment, our buffering will be + B - (T - X) * bw1 / bw + + We should only choose 1) if + X/T < bw1 / (bw1 + bw0 - bw) + */ + + CHECK(mLastBandwidthBps >= 0); + abortThreshold = + (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth + / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth + + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth + - (float)mLastBandwidthBps * 0.7f); + if (abortThreshold < 0.0f) { + abortThreshold = -1.0f; // do not abort + } + ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", + mBandwidthItems.itemAt(currentBWIndex).mBandwidth, + mBandwidthItems.itemAt(targetBWIndex).mBandwidth, + mLastBandwidthBps, + abortThreshold); + } + return abortThreshold; +} + void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); } @@ -1210,8 +1208,6 @@ void LiveSession::changeConfiguration( CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; - mCurBandwidthIndex = bandwidthIndex; - ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", timeUs, bandwidthIndex, pickTrack); @@ -1238,7 +1234,6 @@ void LiveSession::changeConfiguration( if (timeUs < 0ll) { // delay fetcher removal if not picking tracks discardFetcher = pickTrack; - } for (size_t j = 0; j < kMaxStreams; ++j) { @@ -1253,12 +1248,24 @@ void LiveSession::changeConfiguration( if (discardFetcher) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); } else { - // if we're seeking, pause immediately (no need to finish the segment) - bool immediate = (timeUs >= 0ll); - mFetcherInfos.valueAt(i).mFetcher->pauseAsync(immediate); + float threshold = -1.0f; // always finish fetching by default + if (timeUs >= 0ll) { + // seeking, no need to finish fetching + threshold = 0.0f; + } else if (!pickTrack) { + // adapting, abort if remaining of current segment is over threshold + threshold = getAbortThreshold( + mCurBandwidthIndex, bandwidthIndex); + } + + ALOGV("Pausing with threshold %.3f", threshold); + + mFetcherInfos.valueAt(i).mFetcher->pauseAsync(threshold); } } + mCurBandwidthIndex = bandwidthIndex; + sp msg; if (timeUs < 0ll) { // skip onChangeConfiguration2 (decoder destruction) if not seeking. @@ -1451,7 +1458,6 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { } // streamMask now only contains the types that need a new fetcher created. - if (streamMask != 0) { ALOGV("creating new fetchers for mask 0x%08x", streamMask); } @@ -1472,6 +1478,7 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { int64_t startTimeUs = -1; int64_t segmentStartTimeUs = -1ll; int32_t discontinuitySeq = -1; + SeekMode seekMode = kSeekModeExactPosition; sp sources[kMaxStreams]; if (i == kSubtitleIndex) { @@ -1491,6 +1498,16 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { sp meta; if (pickTrack) { // selecting + + // FIXME: + // This should only apply to the track that's being picked, we + // need a bitmask to indicate that. + // + // It's possible that selectTrack() gets called during a bandwidth + // switch, and we needed to fetch a new variant. The new fetcher + // should start from where old fetcher left off, not where decoder + // is dequeueing at. + meta = sources[j]->getLatestDequeuedMeta(); } else { // adapting @@ -1527,14 +1544,22 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { ALOGV("stream[%zu]: queue format change", j); sources[j]->queueDiscontinuity( - ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); + ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); } else { // adapting, queue discontinuities after resume sources[j] = mPacketSources2.valueFor(indexToType(j)); sources[j]->clear(); uint32_t extraStreams = mNewStreamMask & (~mStreamMask); if (extraStreams & indexToType(j)) { - sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); + sources[j]->queueDiscontinuity( + ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); + } + // the new fetcher might be providing streams that used to be + // provided by two different fetchers, if one of the fetcher + // paused in the middle while the other somehow paused in next + // seg, we have to start from next seg. + if (seekMode < mStreams[j].mSeekMode) { + seekMode = mStreams[j].mSeekMode; } } } @@ -1550,7 +1575,7 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, segmentStartTimeUs, discontinuitySeq, - switching); + seekMode); } // All fetchers have now been started, the configuration change @@ -1569,25 +1594,57 @@ void LiveSession::onChangeConfiguration3(const sp &msg) { } } -void LiveSession::onSwapped(const sp &msg) { - int32_t switchGeneration; - CHECK(msg->findInt32("switchGeneration", &switchGeneration)); - if (switchGeneration != mSwitchGeneration) { +void LiveSession::swapPacketSource(StreamType stream) { + ALOGV("swapPacketSource: stream = %d", stream); + + // transfer packets from source2 to source + sp &aps = mPacketSources.editValueFor(stream); + sp &aps2 = mPacketSources2.editValueFor(stream); + + // queue discontinuity in mPacketSource + aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); + + // queue packets in mPacketSource2 to mPacketSource + status_t finalResult = OK; + sp accessUnit; + while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && + OK == aps2->dequeueAccessUnit(&accessUnit)) { + aps->queueAccessUnit(accessUnit); + } + aps2->clear(); +} + +void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) { + if (!mSwitchInProgress) { + return; + } + + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { return; } - int32_t stream; - CHECK(msg->findInt32("stream", &stream)); + // Swap packet source of streams provided by old variant + for (size_t idx = 0; idx < kMaxStreams; idx++) { + if (uri == mStreams[idx].mUri) { + StreamType stream = indexToType(idx); - ssize_t idx = typeToIndex(stream); - CHECK(idx >= 0); - if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { - ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); + swapPacketSource(stream); + + if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { + ALOGW("swapping stream type %d %s to empty stream", + stream, mStreams[idx].mUri.c_str()); + } + mStreams[idx].mUri = mStreams[idx].mNewUri; + mStreams[idx].mNewUri.clear(); + + mSwapMask &= ~stream; + } } - mStreams[idx].mUri = mStreams[idx].mNewUri; - mStreams[idx].mNewUri.clear(); - mSwapMask &= ~stream; + mFetcherInfos.editValueAt(index).mToBeRemoved = false; + + ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask); if (mSwapMask != 0) { return; } @@ -1595,21 +1652,50 @@ void LiveSession::onSwapped(const sp &msg) { // Check if new variant contains extra streams. uint32_t extraStreams = mNewStreamMask & (~mStreamMask); while (extraStreams) { - StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); - swapPacketSource(extraStream); - extraStreams &= ~extraStream; + StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); + extraStreams &= ~stream; + + swapPacketSource(stream); - idx = typeToIndex(extraStream); + ssize_t idx = typeToIndex(stream); CHECK(idx >= 0); if (mStreams[idx].mNewUri.empty()) { ALOGW("swapping extra stream type %d %s to empty stream", - extraStream, mStreams[idx].mUri.c_str()); + stream, mStreams[idx].mUri.c_str()); } mStreams[idx].mUri = mStreams[idx].mNewUri; mStreams[idx].mNewUri.clear(); } - tryToFinishBandwidthSwitch(); + // Restart new fetcher (it was paused after the first 47k block) + // and let it fetch into mPacketSources (not mPacketSources2) + for (size_t i = 0; i < mFetcherInfos.size(); ++i) { + FetcherInfo &info = mFetcherInfos.editValueAt(i); + if (info.mToBeResumed) { + const AString &uri = mFetcherInfos.keyAt(i); + sp sources[kMaxStreams]; + for (size_t j = 0; j < kMaxStreams; ++j) { + if (uri == mStreams[j].mUri) { + sources[j] = mPacketSources.valueFor(indexToType(j)); + } + } + if (sources[kAudioIndex] != NULL + || sources[kVideoIndex] != NULL + || sources[kSubtitleIndex] != NULL) { + ALOGV("resuming fetcher %s", uri.c_str()); + info.mFetcher->startAsync( + sources[kAudioIndex], + sources[kVideoIndex], + sources[kSubtitleIndex]); + } + info.mToBeResumed = false; + } + } + + mStreamMask = mNewStreamMask; + mSwitchInProgress = false; + + ALOGI("#### Finished Bandwidth Switch"); } void LiveSession::schedulePollBuffering() { @@ -1624,7 +1710,7 @@ void LiveSession::cancelPollBuffering() { void LiveSession::onPollBuffering() { ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " - "mInPreparationPhase %d, mCurBandwidthIndex %d, mStreamMask 0x%x", + "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", mSwitchInProgress, mReconfigurationInProgress, mInPreparationPhase, mCurBandwidthIndex, mStreamMask); @@ -1643,30 +1729,9 @@ void LiveSession::onPollBuffering() { schedulePollBuffering(); } -// Mark switch done when: -// 1. all old buffers are swapped out -void LiveSession::tryToFinishBandwidthSwitch() { - if (!mSwitchInProgress) { - return; - } - - bool needToRemoveFetchers = false; - for (size_t i = 0; i < mFetcherInfos.size(); ++i) { - if (mFetcherInfos.valueAt(i).mToBeRemoved) { - needToRemoveFetchers = true; - break; - } - } - - if (!needToRemoveFetchers && mSwapMask == 0) { - ALOGI("mSwitchInProgress = false"); - mStreamMask = mNewStreamMask; - mSwitchInProgress = false; - } -} - void LiveSession::cancelBandwidthSwitch() { - Mutex::Autolock lock(mSwapMutex); + ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++", mSwitchGeneration); + mSwitchGeneration++; mSwitchInProgress = false; mSwapMask = 0; @@ -1760,6 +1825,7 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { int32_t bandwidthBps; if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) { ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); + mLastBandwidthBps = bandwidthBps; } else { ALOGV("no bandwidth estimate."); return; @@ -1778,7 +1844,7 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { return; } - ALOGI("#### Initiate Bandwidth Switch: %d => %d", + ALOGI("#### Starting Bandwidth Switch: %zd => %zd", mCurBandwidthIndex, bandwidthIndex); changeConfiguration(-1, bandwidthIndex, false); } diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h index 685fefa..cbf988e 100644 --- a/media/libstagefright/httplive/LiveSession.h +++ b/media/libstagefright/httplive/LiveSession.h @@ -54,6 +54,12 @@ struct LiveSession : public AHandler { STREAMTYPE_SUBTITLES = 1 << kSubtitleIndex, }; + enum SeekMode { + kSeekModeExactPosition = 0, // used for seeking + kSeekModeNextSample = 1, // used for seamless switching + kSeekModeNextSegment = 2, // used for seamless switching + }; + LiveSession( const sp ¬ify, uint32_t flags, @@ -63,6 +69,8 @@ struct LiveSession : public AHandler { status_t getStreamFormat(StreamType stream, sp *format); + sp getHTTPDataSource(); + void connectAsync( const char *url, const KeyedVector *headers = NULL); @@ -88,11 +96,6 @@ struct LiveSession : public AHandler { kWhatPreparationFailed, }; - // create a format-change discontinuity - // - // swap: - // whether is format-change discontinuity should trigger a buffer swap - sp createFormatChangeBuffer(bool swap = true); protected: virtual ~LiveSession(); @@ -110,7 +113,6 @@ private: kWhatChangeConfiguration2 = 'chC2', kWhatChangeConfiguration3 = 'chC3', kWhatFinishDisconnect2 = 'fin2', - kWhatSwapped = 'swap', kWhatPollBuffering = 'poll', }; @@ -127,23 +129,22 @@ private: struct FetcherInfo { sp mFetcher; int64_t mDurationUs; - bool mIsPrepared; bool mToBeRemoved; + bool mToBeResumed; }; struct StreamItem { const char *mType; AString mUri, mNewUri; + SeekMode mSeekMode; size_t mCurDiscontinuitySeq; int64_t mLastDequeuedTimeUs; int64_t mLastSampleDurationUs; StreamItem() - : mType(""), - mCurDiscontinuitySeq(0), - mLastDequeuedTimeUs(0), - mLastSampleDurationUs(0) {} + : StreamItem("") {} StreamItem(const char *type) : mType(type), + mSeekMode(kSeekModeExactPosition), mCurDiscontinuitySeq(0), mLastDequeuedTimeUs(0), mLastSampleDurationUs(0) {} @@ -169,6 +170,7 @@ private: Vector mBandwidthItems; ssize_t mCurBandwidthIndex; + int32_t mLastBandwidthBps; sp mBandwidthEstimator; sp mPlaylist; @@ -190,11 +192,6 @@ private: // A second set of packet sources that buffer content for the variant we're switching to. KeyedVector > mPacketSources2; - // A mutex used to serialize two sets of events: - // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND - // * a forced bandwidth switch termination in cancelSwitch on the live looper. - Mutex mSwapMutex; - int32_t mSwitchGeneration; int32_t mSubtitleGeneration; @@ -243,11 +240,15 @@ private: uint32_t block_size = 0, /* reuse DataSource if doing partial fetch */ sp *source = NULL, - String8 *actualUrl = NULL); + String8 *actualUrl = NULL, + /* force connect http even when resuing DataSource */ + bool forceConnectHTTP = false); sp fetchPlaylist( const char *url, uint8_t *curPlaylistHash, bool *unchanged); + float getAbortThreshold( + ssize_t currentBWIndex, ssize_t targetBWIndex) const; void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); size_t getBandwidthIndex(int32_t bandwidthBps); int64_t latestMediaSegmentStartTimeUs(); @@ -261,12 +262,9 @@ private: void onChangeConfiguration(const sp &msg); void onChangeConfiguration2(const sp &msg); void onChangeConfiguration3(const sp &msg); - void onSwapped(const sp &msg); - void tryToFinishBandwidthSwitch(); + void swapPacketSource(StreamType stream); + void tryToFinishBandwidthSwitch(const AString &uri); - // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources - // from being swapped out on stale discontinuities while manipulating - // mPacketSources/mPacketSources2. void cancelBandwidthSwitch(); void schedulePollBuffering(); @@ -279,8 +277,6 @@ private: void postPrepared(status_t err); - void swapPacketSource(StreamType stream); - DISALLOW_EVIL_CONSTRUCTORS(LiveSession); }; diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp index a447010..68f0357 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.cpp +++ b/media/libstagefright/httplive/PlaylistFetcher.cpp @@ -54,6 +54,92 @@ const int64_t PlaylistFetcher::kFetcherResumeThreshold = 100000ll; const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024; const int32_t PlaylistFetcher::kNumSkipFrames = 5; +struct PlaylistFetcher::DownloadState : public RefBase { + DownloadState(); + void resetState(); + bool hasSavedState() const; + void restoreState( + AString &uri, + sp &itemMeta, + sp &buffer, + sp &tsBuffer, + int32_t &firstSeqNumberInPlaylist, + int32_t &lastSeqNumberInPlaylist); + void saveState( + AString &uri, + sp &itemMeta, + sp &buffer, + sp &tsBuffer, + int32_t &firstSeqNumberInPlaylist, + int32_t &lastSeqNumberInPlaylist); + +private: + bool mHasSavedState; + AString mUri; + sp mItemMeta; + sp mBuffer; + sp mTsBuffer; + int32_t mFirstSeqNumberInPlaylist; + int32_t mLastSeqNumberInPlaylist; +}; + +PlaylistFetcher::DownloadState::DownloadState() { + resetState(); +} + +bool PlaylistFetcher::DownloadState::hasSavedState() const { + return mHasSavedState; +} + +void PlaylistFetcher::DownloadState::resetState() { + mHasSavedState = false; + + mUri.clear(); + mItemMeta = NULL; + mBuffer = NULL; + mTsBuffer = NULL; + mFirstSeqNumberInPlaylist = 0; + mLastSeqNumberInPlaylist = 0; +} + +void PlaylistFetcher::DownloadState::restoreState( + AString &uri, + sp &itemMeta, + sp &buffer, + sp &tsBuffer, + int32_t &firstSeqNumberInPlaylist, + int32_t &lastSeqNumberInPlaylist) { + if (!mHasSavedState) { + return; + } + + uri = mUri; + itemMeta = mItemMeta; + buffer = mBuffer; + tsBuffer = mTsBuffer; + firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist; + lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist; + + resetState(); +} + +void PlaylistFetcher::DownloadState::saveState( + AString &uri, + sp &itemMeta, + sp &buffer, + sp &tsBuffer, + int32_t &firstSeqNumberInPlaylist, + int32_t &lastSeqNumberInPlaylist) { + mHasSavedState = true; + + mUri = uri; + mItemMeta = itemMeta; + mBuffer = buffer; + mTsBuffer = tsBuffer; + mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist; + mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist; +} + PlaylistFetcher::PlaylistFetcher( const sp ¬ify, const sp &session, @@ -71,18 +157,21 @@ PlaylistFetcher::PlaylistFetcher( mSeqNumber(-1), mNumRetries(0), mStartup(true), - mAdaptive(false), + mSeekMode(LiveSession::kSeekModeExactPosition), mPrepared(false), mTimeChangeSignaled(false), mNextPTSTimeUs(-1ll), mMonitorQueueGeneration(0), mSubtitleGeneration(subtitleGeneration), mLastDiscontinuitySeq(-1ll), - mStopping(false), mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), mFirstPTSValid(false), - mVideoBuffer(new AnotherPacketSource(NULL)) { + mFirstTimeUs(-1ll), + mVideoBuffer(new AnotherPacketSource(NULL)), + mThresholdRatio(-1.0f), + mDownloadState(new DownloadState()) { memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); + mHTTPDataSource = mSession->getHTTPDataSource(); } PlaylistFetcher::~PlaylistFetcher() { @@ -334,9 +423,12 @@ void PlaylistFetcher::cancelMonitorQueue() { ++mMonitorQueueGeneration; } -void PlaylistFetcher::setStopping(bool stopping) { - AutoMutex _l(mStoppingLock); - mStopping = stopping; +void PlaylistFetcher::setStoppingThreshold(float thresholdRatio) { + AutoMutex _l(mThresholdLock); + if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { + return; + } + mThresholdRatio = thresholdRatio; } void PlaylistFetcher::startAsync( @@ -346,7 +438,7 @@ void PlaylistFetcher::startAsync( int64_t startTimeUs, int64_t segmentStartTimeUs, int32_t startDiscontinuitySeq, - bool adaptive) { + LiveSession::SeekMode seekMode) { sp msg = new AMessage(kWhatStart, this); uint32_t streamTypeMask = 0ul; @@ -370,19 +462,19 @@ void PlaylistFetcher::startAsync( msg->setInt64("startTimeUs", startTimeUs); msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); - msg->setInt32("adaptive", adaptive); + msg->setInt32("seekMode", seekMode); msg->post(); } -void PlaylistFetcher::pauseAsync(bool immediate) { - if (immediate) { - setStopping(true); +void PlaylistFetcher::pauseAsync(float thresholdRatio) { + if (thresholdRatio >= 0.0f) { + setStoppingThreshold(thresholdRatio); } (new AMessage(kWhatPause, this))->post(); } void PlaylistFetcher::stopAsync(bool clear) { - setStopping(true); + setStoppingThreshold(0.0f); sp msg = new AMessage(kWhatStop, this); msg->setInt32("clear", clear); @@ -414,6 +506,10 @@ void PlaylistFetcher::onMessageReceived(const sp &msg) { sp notify = mNotify->dup(); notify->setInt32("what", kWhatPaused); + notify->setInt32("seekMode", + mDownloadState->hasSavedState() + ? LiveSession::kSeekModeNextSample + : LiveSession::kSeekModeNextSegment); notify->post(); break; } @@ -464,6 +560,7 @@ status_t PlaylistFetcher::onStart(const sp &msg) { mStartTimeUsNotify = mNotify->dup(); mStartTimeUsNotify->setInt32("what", kWhatStartedAt); mStartTimeUsNotify->setInt32("streamMask", 0); + mStartTimeUsNotify->setString("uri", mURI); uint32_t streamTypeMask; CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); @@ -471,11 +568,11 @@ status_t PlaylistFetcher::onStart(const sp &msg) { int64_t startTimeUs; int64_t segmentStartTimeUs; int32_t startDiscontinuitySeq; - int32_t adaptive; + int32_t seekMode; CHECK(msg->findInt64("startTimeUs", &startTimeUs)); CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); - CHECK(msg->findInt32("adaptive", &adaptive)); + CHECK(msg->findInt32("seekMode", &seekMode)); if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { void *ptr; @@ -510,6 +607,7 @@ status_t PlaylistFetcher::onStart(const sp &msg) { mDiscontinuitySeq = startDiscontinuitySeq; mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; + mSeekMode = (LiveSession::SeekMode) seekMode; if (startTimeUs >= 0) { mStartTimeUs = startTimeUs; @@ -519,8 +617,8 @@ status_t PlaylistFetcher::onStart(const sp &msg) { mPrepared = false; mIDRFound = false; mTimeChangeSignaled = false; - mAdaptive = adaptive; mVideoBuffer->clear(); + mDownloadState->resetState(); } postMonitorQueue(); @@ -532,7 +630,7 @@ void PlaylistFetcher::onPause() { cancelMonitorQueue(); mLastDiscontinuitySeq = mDiscontinuitySeq; - setStopping(false); + setStoppingThreshold(-1.0f); } void PlaylistFetcher::onStop(const sp &msg) { @@ -547,10 +645,11 @@ void PlaylistFetcher::onStop(const sp &msg) { } } + mDownloadState->resetState(); mPacketSources.clear(); mStreamTypeMask = 0; - setStopping(false); + setStoppingThreshold(-1.0f); } // Resume until we have reached the boundary timestamps listed in `msg`; when @@ -602,9 +701,6 @@ status_t PlaylistFetcher::onResumeUntil(const sp &msg) { // Don't resume if all streams are within a resume threshold if (stopCount == mPacketSources.size()) { - for (size_t i = 0; i < mPacketSources.size(); i++) { - mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); - } stopAsync(/* clear = */ false); return OK; } @@ -742,10 +838,74 @@ bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp& buffer) { return buffer->size() > 0 && buffer->data()[0] == 0x47; } -void PlaylistFetcher::onDownloadNext() { +bool PlaylistFetcher::shouldPauseDownload(bool startFound) { + if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { + // doesn't apply to subtitles + return false; + } + + // If we're switching, save state and pause after start point is found + if (mSeekMode != LiveSession::kSeekModeExactPosition && startFound) { + return true; + } + + // Calculate threshold to abort current download + int32_t targetDurationSecs; + CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); + int64_t targetDurationUs = targetDurationSecs * 1000000ll; + int64_t thresholdUs = -1; + { + AutoMutex _l(mThresholdLock); + thresholdUs = (mThresholdRatio < 0.0f) ? + -1ll : mThresholdRatio * targetDurationUs; + } + + if (thresholdUs < 0) { + // never abort + return false; + } else if (thresholdUs == 0) { + // immediately abort + return true; + } + + // now we have a positive thresholdUs, abort if remaining + // portion to download is over that threshold. + if (mSegmentFirstPTS < 0) { + // this means we haven't even find the first access unit, + // abort now as we must be very far away from the end. + return true; + } + int64_t lastEnqueueUs = mSegmentFirstPTS; + for (size_t i = 0; i < mPacketSources.size(); ++i) { + if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { + continue; + } + sp meta = mPacketSources[i]->getLatestEnqueuedMeta(); + int32_t type; + if (meta == NULL || meta->findInt32("discontinuity", &type)) { + continue; + } + int64_t tmpUs; + CHECK(meta->findInt64("timeUs", &tmpUs)); + if (tmpUs > lastEnqueueUs) { + lastEnqueueUs = tmpUs; + } + } + lastEnqueueUs -= mSegmentFirstPTS; + if (targetDurationUs - lastEnqueueUs > thresholdUs) { + return true; + } + return false; +} + +bool PlaylistFetcher::initDownloadState( + AString &uri, + sp &itemMeta, + int32_t &firstSeqNumberInPlaylist, + int32_t &lastSeqNumberInPlaylist) { status_t err = refreshPlaylist(); - int32_t firstSeqNumberInPlaylist = 0; - int32_t lastSeqNumberInPlaylist = 0; + firstSeqNumberInPlaylist = 0; + lastSeqNumberInPlaylist = 0; bool discontinuity = false; if (mPlaylist != NULL) { @@ -761,6 +921,8 @@ void PlaylistFetcher::onDownloadNext() { } } + mSegmentFirstPTS = -1ll; + if (mPlaylist != NULL && mSeqNumber < 0) { CHECK_GE(mStartTimeUs, 0ll); @@ -788,7 +950,7 @@ void PlaylistFetcher::onDownloadNext() { // timestamps coming from the media container) is used to determine the position // inside a segments. mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); - if (mAdaptive) { + if (mSeekMode == LiveSession::kSeekModeNextSegment) { // avoid double fetch/decode mSeqNumber += 1; } @@ -838,12 +1000,12 @@ void PlaylistFetcher::onDownloadNext() { mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist, delayUs, mNumRetries); postMonitorQueue(delayUs); - return; + return false; } if (err != OK) { notifyError(err); - return; + return false; } // we've missed the boat, let's start 3 segments prior to the latest sequence @@ -858,12 +1020,8 @@ void PlaylistFetcher::onDownloadNext() { // but since the segments we are supposed to fetch have already rolled off // the playlist, i.e. we have already missed the boat, we inevitably have to // skip. - for (size_t i = 0; i < mPacketSources.size(); i++) { - sp formatChange = mSession->createFormatChangeBuffer(); - mPacketSources.valueAt(i)->queueAccessUnit(formatChange); - } stopAsync(/* clear = */ false); - return; + return false; } mSeqNumber = lastSeqNumberInPlaylist - 3; if (mSeqNumber < firstSeqNumberInPlaylist) { @@ -879,14 +1037,12 @@ void PlaylistFetcher::onDownloadNext() { firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); notifyError(ERROR_END_OF_STREAM); - return; + return false; } } mNumRetries = 0; - AString uri; - sp itemMeta; CHECK(mPlaylist->itemAt( mSeqNumber - firstSeqNumberInPlaylist, &uri, @@ -909,20 +1065,6 @@ void PlaylistFetcher::onDownloadNext() { } mLastDiscontinuitySeq = -1; - int64_t range_offset, range_length; - if (!itemMeta->findInt64("range-offset", &range_offset) - || !itemMeta->findInt64("range-length", &range_length)) { - range_offset = 0; - range_length = -1; - } - - ALOGV("fetching segment %d from (%d .. %d)", - mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); - - ALOGV("fetching '%s'", uri.c_str()); - - sp source; - sp buffer, tsBuffer; // decrypt a junk buffer to prefetch key; since a session uses only one http connection, // this avoids interleaved connections to the key and segment file. { @@ -932,7 +1074,7 @@ void PlaylistFetcher::onDownloadNext() { true /* first */); if (err != OK) { notifyError(err); - return; + return false; } } @@ -981,13 +1123,57 @@ void PlaylistFetcher::onDownloadNext() { } } + ALOGV("fetching segment %d from (%d .. %d)", + mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); + return true; +} + +void PlaylistFetcher::onDownloadNext() { + AString uri; + sp itemMeta; + sp buffer; + sp tsBuffer; + int32_t firstSeqNumberInPlaylist = 0; + int32_t lastSeqNumberInPlaylist = 0; + bool connectHTTP = true; + + if (mDownloadState->hasSavedState()) { + mDownloadState->restoreState( + uri, + itemMeta, + buffer, + tsBuffer, + firstSeqNumberInPlaylist, + lastSeqNumberInPlaylist); + connectHTTP = false; + ALOGV("resuming: '%s'", uri.c_str()); + } else { + if (!initDownloadState( + uri, + itemMeta, + firstSeqNumberInPlaylist, + lastSeqNumberInPlaylist)) { + return; + } + ALOGV("fetching: '%s'", uri.c_str()); + } + + int64_t range_offset, range_length; + if (!itemMeta->findInt64("range-offset", &range_offset) + || !itemMeta->findInt64("range-length", &range_length)) { + range_offset = 0; + range_length = -1; + } + // block-wise download ssize_t bytesRead; do { - int64_t startUs = ALooper::GetNowUs(); + sp source = mHTTPDataSource; + int64_t startUs = ALooper::GetNowUs(); bytesRead = mSession->fetchFile( - uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source); + uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, + &source, NULL, connectHTTP); // add sample for bandwidth estimation (excluding subtitles) if (bytesRead > 0 @@ -998,6 +1184,8 @@ void PlaylistFetcher::onDownloadNext() { mSession->addBandwidthMeasurement(bytesRead, delayUs); } + connectHTTP = false; + if (bytesRead < 0) { status_t err = bytesRead; ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); @@ -1022,6 +1210,8 @@ void PlaylistFetcher::onDownloadNext() { return; } + bool startUp = mStartup; // save current start up state + err = OK; if (bufferStartsWithTsSyncByte(buffer)) { // Incremental extraction is only supported for MPEG2 transport streams. @@ -1034,7 +1224,6 @@ void PlaylistFetcher::onDownloadNext() { tsBuffer->setRange(tsOff, tsSize); } tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); - err = extractAndQueueAccessUnitsFromTs(tsBuffer); } @@ -1054,9 +1243,18 @@ void PlaylistFetcher::onDownloadNext() { } else if (err != OK) { notifyError(err); return; + } else if (bytesRead != 0 && + shouldPauseDownload(mStartup != startUp /* startFound */)) { + mDownloadState->saveState( + uri, + itemMeta, + buffer, + tsBuffer, + firstSeqNumberInPlaylist, + lastSeqNumberInPlaylist); + return; } - - } while (bytesRead != 0 && !mStopping); + } while (bytesRead != 0); if (bufferStartsWithTsSyncByte(buffer)) { // If we don't see a stream in the program table after fetching a full ts segment @@ -1092,7 +1290,7 @@ void PlaylistFetcher::onDownloadNext() { return; } - err = OK; + status_t err = OK; if (tsBuffer != NULL) { AString method; CHECK(buffer->meta()->findString("cipher-method", &method)); @@ -1125,11 +1323,11 @@ void PlaylistFetcher::onDownloadNext() { } ++mSeqNumber; - postMonitorQueue(); } -int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const { +int32_t PlaylistFetcher::getSeqNumberWithAnchorTime( + int64_t anchorTimeUs, int64_t targetDurationUs) const { int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { @@ -1138,7 +1336,8 @@ int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1; int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1; - while (index >= 0 && anchorTimeUs > mStartTimeUs) { + // adjust anchorTimeUs to within 1x targetDurationUs from mStartTimeUs + while (index >= 0 && anchorTimeUs - mStartTimeUs > targetDurationUs) { sp itemMeta; CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); @@ -1324,6 +1523,9 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu int64_t timeUs; CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); + if (mSegmentFirstPTS < 0ll) { + mSegmentFirstPTS = timeUs; + } if (mStartup) { if (!mFirstPTSValid) { mFirstTimeUs = timeUs; @@ -1336,21 +1538,26 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu } } - if (timeUs < mStartTimeUs || (isAvc && !mIDRFound)) { - // buffer up to the closest preceding IDR frame - ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us", - timeUs, mStartTimeUs); + bool seeking = mSeekMode == LiveSession::kSeekModeExactPosition; + bool startTimeReached = + seeking ? (timeUs >= mStartTimeUs) + : (timeUs > mStartTimeUs); + + if (!startTimeReached || (isAvc && !mIDRFound)) { + // buffer up to the closest preceding IDR frame in the next segement, + // or the closest succeeding IDR frame after the exact position if (isAvc) { - if (IsIDR(accessUnit)) { + if (IsIDR(accessUnit) && (seeking || startTimeReached)) { mVideoBuffer->clear(); mIDRFound = true; } - if (mIDRFound) { + if (mIDRFound && seeking && !startTimeReached) { mVideoBuffer->queueAccessUnit(accessUnit); } } - - continue; + if (!startTimeReached || (isAvc && !mIDRFound)) { + continue; + } } } @@ -1387,7 +1594,8 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu // live stream; re-adjust based on the actual timestamp extracted from the // media segment; if we didn't move backward after the re-adjustment // (newSeqNumber), start at least 1 segment prior. - int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); + int32_t newSeqNumber = getSeqNumberWithAnchorTime( + timeUs, targetDurationUs); if (newSeqNumber >= mSeqNumber) { --mSeqNumber; } else { @@ -1395,6 +1603,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu } mStartTimeUsNotify = mNotify->dup(); mStartTimeUsNotify->setInt32("what", kWhatStartedAt); + mIDRFound = false; return -EAGAIN; } @@ -1413,7 +1622,11 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu if (streamMask == mStreamTypeMask) { mStartup = false; - mStartTimeUsNotify->post(); + // only need to post if we're switching and searching for a + // start point in next segment, or next IDR + if (mSeekMode != LiveSession::kSeekModeExactPosition) { + mStartTimeUsNotify->post(); + } mStartTimeUsNotify.clear(); } } @@ -1428,7 +1641,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &bu || !mStopParams->findInt64(key, &stopTimeUs) || (discontinuitySeq == mDiscontinuitySeq && timeUs >= stopTimeUs)) { - packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); mStreamTypeMask &= ~stream; mPacketSources.removeItemsAt(i); break; @@ -1678,7 +1890,8 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( // Duplicated logic from how we handle .ts playlists. if (mStartup && mSegmentStartTimeUs >= 0 && timeUs - mStartTimeUs > targetDurationUs) { - int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); + int32_t newSeqNumber = getSeqNumberWithAnchorTime( + timeUs, targetDurationUs); if (newSeqNumber >= mSeqNumber) { --mSeqNumber; } else { @@ -1704,7 +1917,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( || discontinuitySeq > mDiscontinuitySeq || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { - packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); mStreamTypeMask = 0; mPacketSources.clear(); return ERROR_OUT_OF_RANGE; diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h index b82e50d..8d34cbc 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.h +++ b/media/libstagefright/httplive/PlaylistFetcher.h @@ -65,9 +65,9 @@ struct PlaylistFetcher : public AHandler { int64_t segmentStartTimeUs = -1ll, // starting position within playlist // startTimeUs!=segmentStartTimeUs only when playlist is live int32_t startDiscontinuitySeq = 0, - bool adaptive = false); + LiveSession::SeekMode seekMode = LiveSession::kSeekModeExactPosition); - void pauseAsync(bool immediate = false); + void pauseAsync(float thresholdRatio); void stopAsync(bool clear = true); @@ -95,6 +95,8 @@ private: kWhatDownloadNext = 'dlnx', }; + struct DownloadState; + static const int64_t kMaxMonitorDelayUs; static const int32_t kNumSkipFrames; @@ -105,6 +107,7 @@ private: sp mNotify; sp mStartTimeUsNotify; + sp mHTTPDataSource; sp mSession; AString mURI; @@ -131,7 +134,7 @@ private: int32_t mNumRetries; bool mStartup; bool mIDRFound; - bool mAdaptive; + int32_t mSeekMode; bool mPrepared; bool mTimeChangeSignaled; int64_t mNextPTSTimeUs; @@ -141,9 +144,6 @@ private: int32_t mLastDiscontinuitySeq; - Mutex mStoppingLock; - bool mStopping; - enum RefreshState { INITIAL_MINIMUM_RELOAD_DELAY, FIRST_UNCHANGED_RELOAD_ATTEMPT, @@ -157,8 +157,8 @@ private: sp mTSParser; bool mFirstPTSValid; - uint64_t mFirstPTS; int64_t mFirstTimeUs; + int64_t mSegmentFirstPTS; sp mVideoBuffer; // Stores the initialization vector to decrypt the next block of cipher text, which can @@ -166,6 +166,11 @@ private: // the last block of cipher text (cipher-block chaining). unsigned char mAESInitVec[16]; + Mutex mThresholdLock; + float mThresholdRatio; + + sp mDownloadState; + // Set first to true if decrypting the first segment of a playlist segment. When // first is true, reset the initialization vector based on the available // information in the manifest; otherwise, use the initialization vector as @@ -181,7 +186,8 @@ private: void postMonitorQueue(int64_t delayUs = 0, int64_t minDelayUs = 0); void cancelMonitorQueue(); - void setStopping(bool stopping); + void setStoppingThreshold(float thresholdRatio); + bool shouldPauseDownload(bool startFound); int64_t delayUsToRefreshPlaylist() const; status_t refreshPlaylist(); @@ -195,6 +201,11 @@ private: void onStop(const sp &msg); void onMonitorQueue(); void onDownloadNext(); + bool initDownloadState( + AString &uri, + sp &itemMeta, + int32_t &firstSeqNumberInPlaylist, + int32_t &lastSeqNumberInPlaylist); // Resume a fetcher to continue until the stopping point stored in msg. status_t onResumeUntil(const sp &msg); @@ -213,7 +224,8 @@ private: void queueDiscontinuity( ATSParser::DiscontinuityType type, const sp &extra); - int32_t getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const; + int32_t getSeqNumberWithAnchorTime( + int64_t anchorTimeUs, int64_t targetDurationUs) const; int32_t getSeqNumberForDiscontinuity(size_t discontinuitySeq) const; int32_t getSeqNumberForTime(int64_t timeUs) const; diff --git a/media/libstagefright/mpeg2ts/ATSParser.h b/media/libstagefright/mpeg2ts/ATSParser.h index 75d76dc..5c50747 100644 --- a/media/libstagefright/mpeg2ts/ATSParser.h +++ b/media/libstagefright/mpeg2ts/ATSParser.h @@ -46,6 +46,9 @@ struct ATSParser : public RefBase { DISCONTINUITY_AUDIO_FORMAT | DISCONTINUITY_VIDEO_FORMAT | DISCONTINUITY_TIME, + DISCONTINUITY_FORMAT_ONLY = + DISCONTINUITY_AUDIO_FORMAT + | DISCONTINUITY_VIDEO_FORMAT, }; enum Flags { diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp index 79a9b04..9f42217 100644 --- a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp +++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp @@ -298,6 +298,7 @@ void AnotherPacketSource::signalEOS(status_t result) { bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) { Mutex::Autolock autoLock(mLock); + *finalResult = OK; if (!mBuffers.empty()) { return true; } @@ -306,6 +307,21 @@ bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) { return false; } +bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) { + Mutex::Autolock autoLock(mLock); + *finalResult = OK; + List >::iterator it; + for (it = mBuffers.begin(); it != mBuffers.end(); it++) { + int32_t discontinuity; + if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) { + return true; + } + } + + *finalResult = mEOSResult; + return false; +} + int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) { Mutex::Autolock autoLock(mLock); return getBufferedDurationUs_l(finalResult); diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.h b/media/libstagefright/mpeg2ts/AnotherPacketSource.h index 809a858..d4fde7c 100644 --- a/media/libstagefright/mpeg2ts/AnotherPacketSource.h +++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.h @@ -43,8 +43,12 @@ struct AnotherPacketSource : public MediaSource { void clear(); + // Returns true if we have any packets including discontinuities bool hasBufferAvailable(status_t *finalResult); + // Returns true if we have packets that's not discontinuities + bool hasDataBufferAvailable(status_t *finalResult); + // Returns the difference between the last and the first queued // presentation timestamps since the last discontinuity (if any). int64_t getBufferedDurationUs(status_t *finalResult); -- cgit v1.1