summaryrefslogtreecommitdiffstats
path: root/media/libstagefright/httplive
diff options
context:
space:
mode:
authorChong Zhang <chz@google.com>2015-03-18 10:31:13 -0700
committerChong Zhang <chz@google.com>2015-03-20 15:30:53 -0700
commita48d372833ccec13c96ece9efcc226e8beac7f59 (patch)
treed525483a442a026414d9ff33550153f65d007b93 /media/libstagefright/httplive
parent7c963e92bc11d4b6a22696c51f9abf42987a1f74 (diff)
downloadframeworks_av-a48d372833ccec13c96ece9efcc226e8beac7f59.zip
frameworks_av-a48d372833ccec13c96ece9efcc226e8beac7f59.tar.gz
frameworks_av-a48d372833ccec13c96ece9efcc226e8beac7f59.tar.bz2
HLS: allow pause/resume in the middle of a segment
- when down switching, decide whether to finish current segment based on bandwidth settings, abort current segment if needed. - when switching, pause new fetcher after the first 47K chunk, and go back to resume old fethcer to stop point immediately. - when old fetcher reaches stop point, swap packet sources and resume new fetcher. - mark switching as done as soon as old fecther reaches stop point. This allows us to resume bandwidth monitoring earlier, and do subsequent switches sooner. bug: 19567254 Change-Id: Iba4b5fb9b06541bb1e49592536648f5d4cbc69ab
Diffstat (limited to 'media/libstagefright/httplive')
-rw-r--r--media/libstagefright/httplive/LiveSession.cpp412
-rw-r--r--media/libstagefright/httplive/LiveSession.h44
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.cpp354
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.h30
4 files changed, 563 insertions, 277 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index f5328a6..738f8b6 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -128,6 +128,7 @@ LiveSession::LiveSession(
mInPreparationPhase(true),
mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
mCurBandwidthIndex(-1),
+ mLastBandwidthBps(-1ll),
mBandwidthEstimator(new BandwidthEstimator()),
mStreamMask(0),
mNewStreamMask(0),
@@ -159,24 +160,6 @@ LiveSession::~LiveSession() {
}
}
-sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
- ABuffer *discontinuity = new ABuffer(0);
- discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
- discontinuity->meta()->setInt32("swapPacketSource", swap);
- discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
- discontinuity->meta()->setInt64("timeUs", -1);
- return discontinuity;
-}
-
-void LiveSession::swapPacketSource(StreamType stream) {
- sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
- sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
- sp<AnotherPacketSource> tmp = aps;
- aps = aps2;
- aps2 = tmp;
- aps2->clear();
-}
-
status_t LiveSession::dequeueAccessUnit(
StreamType stream, sp<ABuffer> *accessUnit) {
if (!(mStreamMask & stream)) {
@@ -189,7 +172,13 @@ status_t LiveSession::dequeueAccessUnit(
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
ssize_t idx = typeToIndex(stream);
- if (!packetSource->hasBufferAvailable(&finalResult)) {
+ // Do not let client pull data if we don't have data packets yet.
+ // We might only have a format discontinuity queued without data.
+ // When NuPlayerDecoder dequeues the format discontinuity, it will
+ // immediately try to getFormat. If we return NULL, NuPlayerDecoder
+ // thinks it can do seamless change, so will not shutdown decoder.
+ // When the actual format arrives, it can't handle it and get stuck.
+ if (!packetSource->hasDataBufferAvailable(&finalResult)) {
if (finalResult == OK) {
return -EAGAIN;
} else {
@@ -197,49 +186,8 @@ status_t LiveSession::dequeueAccessUnit(
}
}
- // Do not let client pull data if we don't have format yet.
- // We might only have a format discontinuity queued without actual data.
- // When NuPlayerDecoder dequeues the format discontinuity, it will
- // immediately try to getFormat. If we return NULL, NuPlayerDecoder
- // thinks it can do seamless change, so will not shutdown decoder.
- // When the actual format arrives, it can't handle it and get stuck.
- // TODO: We need a method to check if the packet source has any
- // data packets available, dequeuing should only start then.
- sp<MetaData> format = packetSource->getFormat();
- if (format == NULL) {
- return -EAGAIN;
- }
- int32_t targetDuration = 0;
- sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
- if (meta != NULL) {
- meta->findInt32("targetDuration", &targetDuration);
- }
-
- int64_t targetDurationUs = targetDuration * 1000000ll;
- if (targetDurationUs == 0 ||
- targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
- // Fetchers limit buffering to
- // min(3 * targetDuration, kMinBufferedDurationUs)
- targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
- }
-
- // wait for counterpart
- sp<AnotherPacketSource> otherSource;
- uint32_t mask = mNewStreamMask & mStreamMask;
- uint32_t fetchersMask = 0;
- for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
- uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
- fetchersMask |= fetcherMask;
- }
- mask &= fetchersMask;
- if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
- otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
- } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
- otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
- }
- if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
- return finalResult == OK ? -EAGAIN : finalResult;
- }
+ // Let the client dequeue as long as we have buffers available
+ // Do not make pause/resume decisions here.
status_t err = packetSource->dequeueAccessUnit(accessUnit);
@@ -278,41 +226,25 @@ status_t LiveSession::dequeueAccessUnit(
type,
extra == NULL ? "NULL" : extra->debugString().c_str());
- int32_t swap;
- if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
- int32_t switchGeneration;
- CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
- {
- Mutex::Autolock lock(mSwapMutex);
- if (switchGeneration == mSwitchGeneration) {
- swapPacketSource(stream);
- sp<AMessage> msg = new AMessage(kWhatSwapped, this);
- msg->setInt32("stream", stream);
- msg->setInt32("switchGeneration", switchGeneration);
- msg->post();
- }
- }
+ size_t seq = strm.mCurDiscontinuitySeq;
+ int64_t offsetTimeUs;
+ if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
+ offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
} else {
- size_t seq = strm.mCurDiscontinuitySeq;
- int64_t offsetTimeUs;
- if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
- offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
- } else {
- offsetTimeUs = 0;
- }
-
- seq += 1;
- if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
- int64_t firstTimeUs;
- firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
- offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
- offsetTimeUs += strm.mLastSampleDurationUs;
- } else {
- offsetTimeUs += strm.mLastSampleDurationUs;
- }
+ offsetTimeUs = 0;
+ }
- mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
+ seq += 1;
+ if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
+ int64_t firstTimeUs;
+ firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
+ offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
+ offsetTimeUs += strm.mLastSampleDurationUs;
+ } else {
+ offsetTimeUs += strm.mLastSampleDurationUs;
}
+
+ mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
} else if (err == OK) {
if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
@@ -373,7 +305,6 @@ status_t LiveSession::dequeueAccessUnit(
}
status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
- // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
if (!(mStreamMask & stream)) {
return UNKNOWN_ERROR;
}
@@ -389,6 +320,10 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
return convertMetaDataToMessage(meta, format);
}
+sp<HTTPBase> LiveSession::getHTTPDataSource() {
+ return new MediaHTTP(mHTTPService->makeHTTPConnection());
+}
+
void LiveSession::connectAsync(
const char *url, const KeyedVector<String8, String8> *headers) {
sp<AMessage> msg = new AMessage(kWhatConnect, this);
@@ -468,21 +403,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
case PlaylistFetcher::kWhatPaused:
case PlaylistFetcher::kWhatStopped:
{
+ AString uri;
+ CHECK(msg->findString("uri", &uri));
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0) {
+ // ignore msgs from fetchers that's already gone
+ break;
+ }
+
if (what == PlaylistFetcher::kWhatStopped) {
- AString uri;
- CHECK(msg->findString("uri", &uri));
- ssize_t index = mFetcherInfos.indexOfKey(uri);
- if (index < 0) {
- // ignore duplicated kWhatStopped messages.
- break;
- }
+ tryToFinishBandwidthSwitch(uri);
mFetcherLooper->unregisterHandler(
mFetcherInfos[index].mFetcher->id());
mFetcherInfos.removeItemsAt(index);
-
- if (mSwitchInProgress) {
- tryToFinishBandwidthSwitch();
+ } else if (what == PlaylistFetcher::kWhatPaused) {
+ int32_t seekMode;
+ CHECK(msg->findInt32("seekMode", &seekMode));
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if (mStreams[i].mUri == uri) {
+ mStreams[i].mSeekMode = (SeekMode) seekMode;
+ }
}
}
@@ -564,6 +505,13 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ AString uri;
+ CHECK(msg->findString("uri", &uri));
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index >= 0) {
+ mFetcherInfos.editValueAt(index).mToBeResumed = true;
+ }
+
// Resume fetcher for the original variant; the resumed fetcher should
// continue until the timestamps found in msg, which is stored by the
// new fetcher to indicate where the new variant has started buffering.
@@ -607,12 +555,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
- case kWhatSwapped:
- {
- onSwapped(msg);
- break;
- }
-
case kWhatPollBuffering:
{
int32_t generation;
@@ -751,11 +693,10 @@ void LiveSession::onConnect(const sp<AMessage> &msg) {
}
void LiveSession::finishDisconnect() {
+ ALOGV("finishDisconnect");
+
// No reconfiguration is currently pending, make sure none will trigger
// during disconnection either.
-
- // Protect mPacketSources from a swapPacketSource race condition through disconnect.
- // (finishDisconnect, onFinishDisconnect2)
cancelBandwidthSwitch();
// cancel buffer polling
@@ -805,8 +746,8 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
FetcherInfo info;
info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
info.mDurationUs = -1ll;
- info.mIsPrepared = false;
info.mToBeRemoved = false;
+ info.mToBeResumed = false;
mFetcherLooper->registerHandler(info.mFetcher);
mFetcherInfos.add(uri, info);
@@ -835,14 +776,15 @@ ssize_t LiveSession::fetchFile(
int64_t range_offset, int64_t range_length,
uint32_t block_size, /* download block size */
sp<DataSource> *source, /* to return and reuse source */
- String8 *actualUrl) {
+ String8 *actualUrl,
+ bool forceConnectHTTP /* force connect HTTP when resuing source */) {
off64_t size;
sp<DataSource> temp_source;
if (source == NULL) {
source = &temp_source;
}
- if (*source == NULL) {
+ if (*source == NULL || forceConnectHTTP) {
if (!strncasecmp(url, "file://", 7)) {
*source = new FileSource(url + 7);
} else if (strncasecmp(url, "http://", 7)
@@ -861,13 +803,18 @@ ssize_t LiveSession::fetchFile(
? "" : AStringPrintf("%lld",
range_offset + range_length - 1).c_str()).c_str()));
}
- status_t err = mHTTPDataSource->connect(url, &headers);
+
+ HTTPBase* httpDataSource =
+ (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get();
+ status_t err = httpDataSource->connect(url, &headers);
if (err != OK) {
return err;
}
- *source = mHTTPDataSource;
+ if (*source == NULL) {
+ *source = mHTTPDataSource;
+ }
}
}
@@ -1003,6 +950,57 @@ static double uniformRand() {
}
#endif
+float LiveSession::getAbortThreshold(
+ ssize_t currentBWIndex, ssize_t targetBWIndex) const {
+ float abortThreshold = -1.0f;
+ if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
+ /*
+ If we're switching down, we need to decide whether to
+
+ 1) finish last segment of high-bandwidth variant, or
+ 2) abort last segment of high-bandwidth variant, and fetch an
+ overlapping portion from low-bandwidth variant.
+
+ Here we try to maximize the amount of buffer left when the
+ switch point is met. Given the following parameters:
+
+ B: our current buffering level in seconds
+ T: target duration in seconds
+ X: sample duration in seconds remain to fetch in last segment
+ bw0: bandwidth of old variant (as specified in playlist)
+ bw1: bandwidth of new variant (as specified in playlist)
+ bw: measured bandwidth available
+
+ If we choose 1), when switch happens at the end of current
+ segment, our buffering will be
+ B + X - X * bw0 / bw
+
+ If we choose 2), when switch happens where we aborted current
+ segment, our buffering will be
+ B - (T - X) * bw1 / bw
+
+ We should only choose 1) if
+ X/T < bw1 / (bw1 + bw0 - bw)
+ */
+
+ CHECK(mLastBandwidthBps >= 0);
+ abortThreshold =
+ (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
+ / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
+ + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
+ - (float)mLastBandwidthBps * 0.7f);
+ if (abortThreshold < 0.0f) {
+ abortThreshold = -1.0f; // do not abort
+ }
+ ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
+ mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
+ mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
+ mLastBandwidthBps,
+ abortThreshold);
+ }
+ return abortThreshold;
+}
+
void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
}
@@ -1210,8 +1208,6 @@ void LiveSession::changeConfiguration(
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
- mCurBandwidthIndex = bandwidthIndex;
-
ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
timeUs, bandwidthIndex, pickTrack);
@@ -1238,7 +1234,6 @@ void LiveSession::changeConfiguration(
if (timeUs < 0ll) {
// delay fetcher removal if not picking tracks
discardFetcher = pickTrack;
-
}
for (size_t j = 0; j < kMaxStreams; ++j) {
@@ -1253,12 +1248,24 @@ void LiveSession::changeConfiguration(
if (discardFetcher) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
} else {
- // if we're seeking, pause immediately (no need to finish the segment)
- bool immediate = (timeUs >= 0ll);
- mFetcherInfos.valueAt(i).mFetcher->pauseAsync(immediate);
+ float threshold = -1.0f; // always finish fetching by default
+ if (timeUs >= 0ll) {
+ // seeking, no need to finish fetching
+ threshold = 0.0f;
+ } else if (!pickTrack) {
+ // adapting, abort if remaining of current segment is over threshold
+ threshold = getAbortThreshold(
+ mCurBandwidthIndex, bandwidthIndex);
+ }
+
+ ALOGV("Pausing with threshold %.3f", threshold);
+
+ mFetcherInfos.valueAt(i).mFetcher->pauseAsync(threshold);
}
}
+ mCurBandwidthIndex = bandwidthIndex;
+
sp<AMessage> msg;
if (timeUs < 0ll) {
// skip onChangeConfiguration2 (decoder destruction) if not seeking.
@@ -1451,7 +1458,6 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
}
// streamMask now only contains the types that need a new fetcher created.
-
if (streamMask != 0) {
ALOGV("creating new fetchers for mask 0x%08x", streamMask);
}
@@ -1472,6 +1478,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
int64_t startTimeUs = -1;
int64_t segmentStartTimeUs = -1ll;
int32_t discontinuitySeq = -1;
+ SeekMode seekMode = kSeekModeExactPosition;
sp<AnotherPacketSource> sources[kMaxStreams];
if (i == kSubtitleIndex) {
@@ -1491,6 +1498,16 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
sp<AMessage> meta;
if (pickTrack) {
// selecting
+
+ // FIXME:
+ // This should only apply to the track that's being picked, we
+ // need a bitmask to indicate that.
+ //
+ // It's possible that selectTrack() gets called during a bandwidth
+ // switch, and we needed to fetch a new variant. The new fetcher
+ // should start from where old fetcher left off, not where decoder
+ // is dequeueing at.
+
meta = sources[j]->getLatestDequeuedMeta();
} else {
// adapting
@@ -1527,14 +1544,22 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
ALOGV("stream[%zu]: queue format change", j);
sources[j]->queueDiscontinuity(
- ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
+ ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
} else {
// adapting, queue discontinuities after resume
sources[j] = mPacketSources2.valueFor(indexToType(j));
sources[j]->clear();
uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
if (extraStreams & indexToType(j)) {
- sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
+ sources[j]->queueDiscontinuity(
+ ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
+ }
+ // the new fetcher might be providing streams that used to be
+ // provided by two different fetchers, if one of the fetcher
+ // paused in the middle while the other somehow paused in next
+ // seg, we have to start from next seg.
+ if (seekMode < mStreams[j].mSeekMode) {
+ seekMode = mStreams[j].mSeekMode;
}
}
}
@@ -1550,7 +1575,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
segmentStartTimeUs,
discontinuitySeq,
- switching);
+ seekMode);
}
// All fetchers have now been started, the configuration change
@@ -1569,25 +1594,57 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
}
}
-void LiveSession::onSwapped(const sp<AMessage> &msg) {
- int32_t switchGeneration;
- CHECK(msg->findInt32("switchGeneration", &switchGeneration));
- if (switchGeneration != mSwitchGeneration) {
+void LiveSession::swapPacketSource(StreamType stream) {
+ ALOGV("swapPacketSource: stream = %d", stream);
+
+ // transfer packets from source2 to source
+ sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
+ sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
+
+ // queue discontinuity in mPacketSource
+ aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
+
+ // queue packets in mPacketSource2 to mPacketSource
+ status_t finalResult = OK;
+ sp<ABuffer> accessUnit;
+ while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
+ OK == aps2->dequeueAccessUnit(&accessUnit)) {
+ aps->queueAccessUnit(accessUnit);
+ }
+ aps2->clear();
+}
+
+void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
+ if (!mSwitchInProgress) {
+ return;
+ }
+
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
return;
}
- int32_t stream;
- CHECK(msg->findInt32("stream", &stream));
+ // Swap packet source of streams provided by old variant
+ for (size_t idx = 0; idx < kMaxStreams; idx++) {
+ if (uri == mStreams[idx].mUri) {
+ StreamType stream = indexToType(idx);
- ssize_t idx = typeToIndex(stream);
- CHECK(idx >= 0);
- if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
- ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
+ swapPacketSource(stream);
+
+ if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
+ ALOGW("swapping stream type %d %s to empty stream",
+ stream, mStreams[idx].mUri.c_str());
+ }
+ mStreams[idx].mUri = mStreams[idx].mNewUri;
+ mStreams[idx].mNewUri.clear();
+
+ mSwapMask &= ~stream;
+ }
}
- mStreams[idx].mUri = mStreams[idx].mNewUri;
- mStreams[idx].mNewUri.clear();
- mSwapMask &= ~stream;
+ mFetcherInfos.editValueAt(index).mToBeRemoved = false;
+
+ ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask);
if (mSwapMask != 0) {
return;
}
@@ -1595,21 +1652,50 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) {
// Check if new variant contains extra streams.
uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
while (extraStreams) {
- StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
- swapPacketSource(extraStream);
- extraStreams &= ~extraStream;
+ StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
+ extraStreams &= ~stream;
+
+ swapPacketSource(stream);
- idx = typeToIndex(extraStream);
+ ssize_t idx = typeToIndex(stream);
CHECK(idx >= 0);
if (mStreams[idx].mNewUri.empty()) {
ALOGW("swapping extra stream type %d %s to empty stream",
- extraStream, mStreams[idx].mUri.c_str());
+ stream, mStreams[idx].mUri.c_str());
}
mStreams[idx].mUri = mStreams[idx].mNewUri;
mStreams[idx].mNewUri.clear();
}
- tryToFinishBandwidthSwitch();
+ // Restart new fetcher (it was paused after the first 47k block)
+ // and let it fetch into mPacketSources (not mPacketSources2)
+ for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
+ FetcherInfo &info = mFetcherInfos.editValueAt(i);
+ if (info.mToBeResumed) {
+ const AString &uri = mFetcherInfos.keyAt(i);
+ sp<AnotherPacketSource> sources[kMaxStreams];
+ for (size_t j = 0; j < kMaxStreams; ++j) {
+ if (uri == mStreams[j].mUri) {
+ sources[j] = mPacketSources.valueFor(indexToType(j));
+ }
+ }
+ if (sources[kAudioIndex] != NULL
+ || sources[kVideoIndex] != NULL
+ || sources[kSubtitleIndex] != NULL) {
+ ALOGV("resuming fetcher %s", uri.c_str());
+ info.mFetcher->startAsync(
+ sources[kAudioIndex],
+ sources[kVideoIndex],
+ sources[kSubtitleIndex]);
+ }
+ info.mToBeResumed = false;
+ }
+ }
+
+ mStreamMask = mNewStreamMask;
+ mSwitchInProgress = false;
+
+ ALOGI("#### Finished Bandwidth Switch");
}
void LiveSession::schedulePollBuffering() {
@@ -1624,7 +1710,7 @@ void LiveSession::cancelPollBuffering() {
void LiveSession::onPollBuffering() {
ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
- "mInPreparationPhase %d, mCurBandwidthIndex %d, mStreamMask 0x%x",
+ "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
mSwitchInProgress, mReconfigurationInProgress,
mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
@@ -1643,30 +1729,9 @@ void LiveSession::onPollBuffering() {
schedulePollBuffering();
}
-// Mark switch done when:
-// 1. all old buffers are swapped out
-void LiveSession::tryToFinishBandwidthSwitch() {
- if (!mSwitchInProgress) {
- return;
- }
-
- bool needToRemoveFetchers = false;
- for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
- if (mFetcherInfos.valueAt(i).mToBeRemoved) {
- needToRemoveFetchers = true;
- break;
- }
- }
-
- if (!needToRemoveFetchers && mSwapMask == 0) {
- ALOGI("mSwitchInProgress = false");
- mStreamMask = mNewStreamMask;
- mSwitchInProgress = false;
- }
-}
-
void LiveSession::cancelBandwidthSwitch() {
- Mutex::Autolock lock(mSwapMutex);
+ ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++", mSwitchGeneration);
+
mSwitchGeneration++;
mSwitchInProgress = false;
mSwapMask = 0;
@@ -1760,6 +1825,7 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
int32_t bandwidthBps;
if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) {
ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
+ mLastBandwidthBps = bandwidthBps;
} else {
ALOGV("no bandwidth estimate.");
return;
@@ -1778,7 +1844,7 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
return;
}
- ALOGI("#### Initiate Bandwidth Switch: %d => %d",
+ ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
mCurBandwidthIndex, bandwidthIndex);
changeConfiguration(-1, bandwidthIndex, false);
}
diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h
index 685fefa..cbf988e 100644
--- a/media/libstagefright/httplive/LiveSession.h
+++ b/media/libstagefright/httplive/LiveSession.h
@@ -54,6 +54,12 @@ struct LiveSession : public AHandler {
STREAMTYPE_SUBTITLES = 1 << kSubtitleIndex,
};
+ enum SeekMode {
+ kSeekModeExactPosition = 0, // used for seeking
+ kSeekModeNextSample = 1, // used for seamless switching
+ kSeekModeNextSegment = 2, // used for seamless switching
+ };
+
LiveSession(
const sp<AMessage> &notify,
uint32_t flags,
@@ -63,6 +69,8 @@ struct LiveSession : public AHandler {
status_t getStreamFormat(StreamType stream, sp<AMessage> *format);
+ sp<HTTPBase> getHTTPDataSource();
+
void connectAsync(
const char *url,
const KeyedVector<String8, String8> *headers = NULL);
@@ -88,11 +96,6 @@ struct LiveSession : public AHandler {
kWhatPreparationFailed,
};
- // create a format-change discontinuity
- //
- // swap:
- // whether is format-change discontinuity should trigger a buffer swap
- sp<ABuffer> createFormatChangeBuffer(bool swap = true);
protected:
virtual ~LiveSession();
@@ -110,7 +113,6 @@ private:
kWhatChangeConfiguration2 = 'chC2',
kWhatChangeConfiguration3 = 'chC3',
kWhatFinishDisconnect2 = 'fin2',
- kWhatSwapped = 'swap',
kWhatPollBuffering = 'poll',
};
@@ -127,23 +129,22 @@ private:
struct FetcherInfo {
sp<PlaylistFetcher> mFetcher;
int64_t mDurationUs;
- bool mIsPrepared;
bool mToBeRemoved;
+ bool mToBeResumed;
};
struct StreamItem {
const char *mType;
AString mUri, mNewUri;
+ SeekMode mSeekMode;
size_t mCurDiscontinuitySeq;
int64_t mLastDequeuedTimeUs;
int64_t mLastSampleDurationUs;
StreamItem()
- : mType(""),
- mCurDiscontinuitySeq(0),
- mLastDequeuedTimeUs(0),
- mLastSampleDurationUs(0) {}
+ : StreamItem("") {}
StreamItem(const char *type)
: mType(type),
+ mSeekMode(kSeekModeExactPosition),
mCurDiscontinuitySeq(0),
mLastDequeuedTimeUs(0),
mLastSampleDurationUs(0) {}
@@ -169,6 +170,7 @@ private:
Vector<BandwidthItem> mBandwidthItems;
ssize_t mCurBandwidthIndex;
+ int32_t mLastBandwidthBps;
sp<BandwidthEstimator> mBandwidthEstimator;
sp<M3UParser> mPlaylist;
@@ -190,11 +192,6 @@ private:
// A second set of packet sources that buffer content for the variant we're switching to.
KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2;
- // A mutex used to serialize two sets of events:
- // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND
- // * a forced bandwidth switch termination in cancelSwitch on the live looper.
- Mutex mSwapMutex;
-
int32_t mSwitchGeneration;
int32_t mSubtitleGeneration;
@@ -243,11 +240,15 @@ private:
uint32_t block_size = 0,
/* reuse DataSource if doing partial fetch */
sp<DataSource> *source = NULL,
- String8 *actualUrl = NULL);
+ String8 *actualUrl = NULL,
+ /* force connect http even when resuing DataSource */
+ bool forceConnectHTTP = false);
sp<M3UParser> fetchPlaylist(
const char *url, uint8_t *curPlaylistHash, bool *unchanged);
+ float getAbortThreshold(
+ ssize_t currentBWIndex, ssize_t targetBWIndex) const;
void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
size_t getBandwidthIndex(int32_t bandwidthBps);
int64_t latestMediaSegmentStartTimeUs();
@@ -261,12 +262,9 @@ private:
void onChangeConfiguration(const sp<AMessage> &msg);
void onChangeConfiguration2(const sp<AMessage> &msg);
void onChangeConfiguration3(const sp<AMessage> &msg);
- void onSwapped(const sp<AMessage> &msg);
- void tryToFinishBandwidthSwitch();
+ void swapPacketSource(StreamType stream);
+ void tryToFinishBandwidthSwitch(const AString &uri);
- // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources
- // from being swapped out on stale discontinuities while manipulating
- // mPacketSources/mPacketSources2.
void cancelBandwidthSwitch();
void schedulePollBuffering();
@@ -279,8 +277,6 @@ private:
void postPrepared(status_t err);
- void swapPacketSource(StreamType stream);
-
DISALLOW_EVIL_CONSTRUCTORS(LiveSession);
};
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp
index a447010..68f0357 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.cpp
+++ b/media/libstagefright/httplive/PlaylistFetcher.cpp
@@ -54,6 +54,92 @@ const int64_t PlaylistFetcher::kFetcherResumeThreshold = 100000ll;
const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
const int32_t PlaylistFetcher::kNumSkipFrames = 5;
+struct PlaylistFetcher::DownloadState : public RefBase {
+ DownloadState();
+ void resetState();
+ bool hasSavedState() const;
+ void restoreState(
+ AString &uri,
+ sp<AMessage> &itemMeta,
+ sp<ABuffer> &buffer,
+ sp<ABuffer> &tsBuffer,
+ int32_t &firstSeqNumberInPlaylist,
+ int32_t &lastSeqNumberInPlaylist);
+ void saveState(
+ AString &uri,
+ sp<AMessage> &itemMeta,
+ sp<ABuffer> &buffer,
+ sp<ABuffer> &tsBuffer,
+ int32_t &firstSeqNumberInPlaylist,
+ int32_t &lastSeqNumberInPlaylist);
+
+private:
+ bool mHasSavedState;
+ AString mUri;
+ sp<AMessage> mItemMeta;
+ sp<ABuffer> mBuffer;
+ sp<ABuffer> mTsBuffer;
+ int32_t mFirstSeqNumberInPlaylist;
+ int32_t mLastSeqNumberInPlaylist;
+};
+
+PlaylistFetcher::DownloadState::DownloadState() {
+ resetState();
+}
+
+bool PlaylistFetcher::DownloadState::hasSavedState() const {
+ return mHasSavedState;
+}
+
+void PlaylistFetcher::DownloadState::resetState() {
+ mHasSavedState = false;
+
+ mUri.clear();
+ mItemMeta = NULL;
+ mBuffer = NULL;
+ mTsBuffer = NULL;
+ mFirstSeqNumberInPlaylist = 0;
+ mLastSeqNumberInPlaylist = 0;
+}
+
+void PlaylistFetcher::DownloadState::restoreState(
+ AString &uri,
+ sp<AMessage> &itemMeta,
+ sp<ABuffer> &buffer,
+ sp<ABuffer> &tsBuffer,
+ int32_t &firstSeqNumberInPlaylist,
+ int32_t &lastSeqNumberInPlaylist) {
+ if (!mHasSavedState) {
+ return;
+ }
+
+ uri = mUri;
+ itemMeta = mItemMeta;
+ buffer = mBuffer;
+ tsBuffer = mTsBuffer;
+ firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
+ lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
+
+ resetState();
+}
+
+void PlaylistFetcher::DownloadState::saveState(
+ AString &uri,
+ sp<AMessage> &itemMeta,
+ sp<ABuffer> &buffer,
+ sp<ABuffer> &tsBuffer,
+ int32_t &firstSeqNumberInPlaylist,
+ int32_t &lastSeqNumberInPlaylist) {
+ mHasSavedState = true;
+
+ mUri = uri;
+ mItemMeta = itemMeta;
+ mBuffer = buffer;
+ mTsBuffer = tsBuffer;
+ mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
+ mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
+}
+
PlaylistFetcher::PlaylistFetcher(
const sp<AMessage> &notify,
const sp<LiveSession> &session,
@@ -71,18 +157,21 @@ PlaylistFetcher::PlaylistFetcher(
mSeqNumber(-1),
mNumRetries(0),
mStartup(true),
- mAdaptive(false),
+ mSeekMode(LiveSession::kSeekModeExactPosition),
mPrepared(false),
mTimeChangeSignaled(false),
mNextPTSTimeUs(-1ll),
mMonitorQueueGeneration(0),
mSubtitleGeneration(subtitleGeneration),
mLastDiscontinuitySeq(-1ll),
- mStopping(false),
mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
mFirstPTSValid(false),
- mVideoBuffer(new AnotherPacketSource(NULL)) {
+ mFirstTimeUs(-1ll),
+ mVideoBuffer(new AnotherPacketSource(NULL)),
+ mThresholdRatio(-1.0f),
+ mDownloadState(new DownloadState()) {
memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
+ mHTTPDataSource = mSession->getHTTPDataSource();
}
PlaylistFetcher::~PlaylistFetcher() {
@@ -334,9 +423,12 @@ void PlaylistFetcher::cancelMonitorQueue() {
++mMonitorQueueGeneration;
}
-void PlaylistFetcher::setStopping(bool stopping) {
- AutoMutex _l(mStoppingLock);
- mStopping = stopping;
+void PlaylistFetcher::setStoppingThreshold(float thresholdRatio) {
+ AutoMutex _l(mThresholdLock);
+ if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
+ return;
+ }
+ mThresholdRatio = thresholdRatio;
}
void PlaylistFetcher::startAsync(
@@ -346,7 +438,7 @@ void PlaylistFetcher::startAsync(
int64_t startTimeUs,
int64_t segmentStartTimeUs,
int32_t startDiscontinuitySeq,
- bool adaptive) {
+ LiveSession::SeekMode seekMode) {
sp<AMessage> msg = new AMessage(kWhatStart, this);
uint32_t streamTypeMask = 0ul;
@@ -370,19 +462,19 @@ void PlaylistFetcher::startAsync(
msg->setInt64("startTimeUs", startTimeUs);
msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
- msg->setInt32("adaptive", adaptive);
+ msg->setInt32("seekMode", seekMode);
msg->post();
}
-void PlaylistFetcher::pauseAsync(bool immediate) {
- if (immediate) {
- setStopping(true);
+void PlaylistFetcher::pauseAsync(float thresholdRatio) {
+ if (thresholdRatio >= 0.0f) {
+ setStoppingThreshold(thresholdRatio);
}
(new AMessage(kWhatPause, this))->post();
}
void PlaylistFetcher::stopAsync(bool clear) {
- setStopping(true);
+ setStoppingThreshold(0.0f);
sp<AMessage> msg = new AMessage(kWhatStop, this);
msg->setInt32("clear", clear);
@@ -414,6 +506,10 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatPaused);
+ notify->setInt32("seekMode",
+ mDownloadState->hasSavedState()
+ ? LiveSession::kSeekModeNextSample
+ : LiveSession::kSeekModeNextSegment);
notify->post();
break;
}
@@ -464,6 +560,7 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
mStartTimeUsNotify = mNotify->dup();
mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
mStartTimeUsNotify->setInt32("streamMask", 0);
+ mStartTimeUsNotify->setString("uri", mURI);
uint32_t streamTypeMask;
CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
@@ -471,11 +568,11 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
int64_t startTimeUs;
int64_t segmentStartTimeUs;
int32_t startDiscontinuitySeq;
- int32_t adaptive;
+ int32_t seekMode;
CHECK(msg->findInt64("startTimeUs", &startTimeUs));
CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
- CHECK(msg->findInt32("adaptive", &adaptive));
+ CHECK(msg->findInt32("seekMode", &seekMode));
if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
void *ptr;
@@ -510,6 +607,7 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
mDiscontinuitySeq = startDiscontinuitySeq;
mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
+ mSeekMode = (LiveSession::SeekMode) seekMode;
if (startTimeUs >= 0) {
mStartTimeUs = startTimeUs;
@@ -519,8 +617,8 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
mPrepared = false;
mIDRFound = false;
mTimeChangeSignaled = false;
- mAdaptive = adaptive;
mVideoBuffer->clear();
+ mDownloadState->resetState();
}
postMonitorQueue();
@@ -532,7 +630,7 @@ void PlaylistFetcher::onPause() {
cancelMonitorQueue();
mLastDiscontinuitySeq = mDiscontinuitySeq;
- setStopping(false);
+ setStoppingThreshold(-1.0f);
}
void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
@@ -547,10 +645,11 @@ void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
}
}
+ mDownloadState->resetState();
mPacketSources.clear();
mStreamTypeMask = 0;
- setStopping(false);
+ setStoppingThreshold(-1.0f);
}
// Resume until we have reached the boundary timestamps listed in `msg`; when
@@ -602,9 +701,6 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
// Don't resume if all streams are within a resume threshold
if (stopCount == mPacketSources.size()) {
- for (size_t i = 0; i < mPacketSources.size(); i++) {
- mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
- }
stopAsync(/* clear = */ false);
return OK;
}
@@ -742,10 +838,74 @@ bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
return buffer->size() > 0 && buffer->data()[0] == 0x47;
}
-void PlaylistFetcher::onDownloadNext() {
+bool PlaylistFetcher::shouldPauseDownload(bool startFound) {
+ if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
+ // doesn't apply to subtitles
+ return false;
+ }
+
+ // If we're switching, save state and pause after start point is found
+ if (mSeekMode != LiveSession::kSeekModeExactPosition && startFound) {
+ return true;
+ }
+
+ // Calculate threshold to abort current download
+ int32_t targetDurationSecs;
+ CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
+ int64_t targetDurationUs = targetDurationSecs * 1000000ll;
+ int64_t thresholdUs = -1;
+ {
+ AutoMutex _l(mThresholdLock);
+ thresholdUs = (mThresholdRatio < 0.0f) ?
+ -1ll : mThresholdRatio * targetDurationUs;
+ }
+
+ if (thresholdUs < 0) {
+ // never abort
+ return false;
+ } else if (thresholdUs == 0) {
+ // immediately abort
+ return true;
+ }
+
+ // now we have a positive thresholdUs, abort if remaining
+ // portion to download is over that threshold.
+ if (mSegmentFirstPTS < 0) {
+ // this means we haven't even find the first access unit,
+ // abort now as we must be very far away from the end.
+ return true;
+ }
+ int64_t lastEnqueueUs = mSegmentFirstPTS;
+ for (size_t i = 0; i < mPacketSources.size(); ++i) {
+ if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
+ continue;
+ }
+ sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
+ int32_t type;
+ if (meta == NULL || meta->findInt32("discontinuity", &type)) {
+ continue;
+ }
+ int64_t tmpUs;
+ CHECK(meta->findInt64("timeUs", &tmpUs));
+ if (tmpUs > lastEnqueueUs) {
+ lastEnqueueUs = tmpUs;
+ }
+ }
+ lastEnqueueUs -= mSegmentFirstPTS;
+ if (targetDurationUs - lastEnqueueUs > thresholdUs) {
+ return true;
+ }
+ return false;
+}
+
+bool PlaylistFetcher::initDownloadState(
+ AString &uri,
+ sp<AMessage> &itemMeta,
+ int32_t &firstSeqNumberInPlaylist,
+ int32_t &lastSeqNumberInPlaylist) {
status_t err = refreshPlaylist();
- int32_t firstSeqNumberInPlaylist = 0;
- int32_t lastSeqNumberInPlaylist = 0;
+ firstSeqNumberInPlaylist = 0;
+ lastSeqNumberInPlaylist = 0;
bool discontinuity = false;
if (mPlaylist != NULL) {
@@ -761,6 +921,8 @@ void PlaylistFetcher::onDownloadNext() {
}
}
+ mSegmentFirstPTS = -1ll;
+
if (mPlaylist != NULL && mSeqNumber < 0) {
CHECK_GE(mStartTimeUs, 0ll);
@@ -788,7 +950,7 @@ void PlaylistFetcher::onDownloadNext() {
// timestamps coming from the media container) is used to determine the position
// inside a segments.
mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
- if (mAdaptive) {
+ if (mSeekMode == LiveSession::kSeekModeNextSegment) {
// avoid double fetch/decode
mSeqNumber += 1;
}
@@ -838,12 +1000,12 @@ void PlaylistFetcher::onDownloadNext() {
mSeqNumber, firstSeqNumberInPlaylist,
lastSeqNumberInPlaylist, delayUs, mNumRetries);
postMonitorQueue(delayUs);
- return;
+ return false;
}
if (err != OK) {
notifyError(err);
- return;
+ return false;
}
// we've missed the boat, let's start 3 segments prior to the latest sequence
@@ -858,12 +1020,8 @@ void PlaylistFetcher::onDownloadNext() {
// but since the segments we are supposed to fetch have already rolled off
// the playlist, i.e. we have already missed the boat, we inevitably have to
// skip.
- for (size_t i = 0; i < mPacketSources.size(); i++) {
- sp<ABuffer> formatChange = mSession->createFormatChangeBuffer();
- mPacketSources.valueAt(i)->queueAccessUnit(formatChange);
- }
stopAsync(/* clear = */ false);
- return;
+ return false;
}
mSeqNumber = lastSeqNumberInPlaylist - 3;
if (mSeqNumber < firstSeqNumberInPlaylist) {
@@ -879,14 +1037,12 @@ void PlaylistFetcher::onDownloadNext() {
firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
notifyError(ERROR_END_OF_STREAM);
- return;
+ return false;
}
}
mNumRetries = 0;
- AString uri;
- sp<AMessage> itemMeta;
CHECK(mPlaylist->itemAt(
mSeqNumber - firstSeqNumberInPlaylist,
&uri,
@@ -909,20 +1065,6 @@ void PlaylistFetcher::onDownloadNext() {
}
mLastDiscontinuitySeq = -1;
- int64_t range_offset, range_length;
- if (!itemMeta->findInt64("range-offset", &range_offset)
- || !itemMeta->findInt64("range-length", &range_length)) {
- range_offset = 0;
- range_length = -1;
- }
-
- ALOGV("fetching segment %d from (%d .. %d)",
- mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
-
- ALOGV("fetching '%s'", uri.c_str());
-
- sp<DataSource> source;
- sp<ABuffer> buffer, tsBuffer;
// decrypt a junk buffer to prefetch key; since a session uses only one http connection,
// this avoids interleaved connections to the key and segment file.
{
@@ -932,7 +1074,7 @@ void PlaylistFetcher::onDownloadNext() {
true /* first */);
if (err != OK) {
notifyError(err);
- return;
+ return false;
}
}
@@ -981,13 +1123,57 @@ void PlaylistFetcher::onDownloadNext() {
}
}
+ ALOGV("fetching segment %d from (%d .. %d)",
+ mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
+ return true;
+}
+
+void PlaylistFetcher::onDownloadNext() {
+ AString uri;
+ sp<AMessage> itemMeta;
+ sp<ABuffer> buffer;
+ sp<ABuffer> tsBuffer;
+ int32_t firstSeqNumberInPlaylist = 0;
+ int32_t lastSeqNumberInPlaylist = 0;
+ bool connectHTTP = true;
+
+ if (mDownloadState->hasSavedState()) {
+ mDownloadState->restoreState(
+ uri,
+ itemMeta,
+ buffer,
+ tsBuffer,
+ firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
+ connectHTTP = false;
+ ALOGV("resuming: '%s'", uri.c_str());
+ } else {
+ if (!initDownloadState(
+ uri,
+ itemMeta,
+ firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist)) {
+ return;
+ }
+ ALOGV("fetching: '%s'", uri.c_str());
+ }
+
+ int64_t range_offset, range_length;
+ if (!itemMeta->findInt64("range-offset", &range_offset)
+ || !itemMeta->findInt64("range-length", &range_length)) {
+ range_offset = 0;
+ range_length = -1;
+ }
+
// block-wise download
ssize_t bytesRead;
do {
- int64_t startUs = ALooper::GetNowUs();
+ sp<DataSource> source = mHTTPDataSource;
+ int64_t startUs = ALooper::GetNowUs();
bytesRead = mSession->fetchFile(
- uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
+ uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
+ &source, NULL, connectHTTP);
// add sample for bandwidth estimation (excluding subtitles)
if (bytesRead > 0
@@ -998,6 +1184,8 @@ void PlaylistFetcher::onDownloadNext() {
mSession->addBandwidthMeasurement(bytesRead, delayUs);
}
+ connectHTTP = false;
+
if (bytesRead < 0) {
status_t err = bytesRead;
ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
@@ -1022,6 +1210,8 @@ void PlaylistFetcher::onDownloadNext() {
return;
}
+ bool startUp = mStartup; // save current start up state
+
err = OK;
if (bufferStartsWithTsSyncByte(buffer)) {
// Incremental extraction is only supported for MPEG2 transport streams.
@@ -1034,7 +1224,6 @@ void PlaylistFetcher::onDownloadNext() {
tsBuffer->setRange(tsOff, tsSize);
}
tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
-
err = extractAndQueueAccessUnitsFromTs(tsBuffer);
}
@@ -1054,9 +1243,18 @@ void PlaylistFetcher::onDownloadNext() {
} else if (err != OK) {
notifyError(err);
return;
+ } else if (bytesRead != 0 &&
+ shouldPauseDownload(mStartup != startUp /* startFound */)) {
+ mDownloadState->saveState(
+ uri,
+ itemMeta,
+ buffer,
+ tsBuffer,
+ firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
+ return;
}
-
- } while (bytesRead != 0 && !mStopping);
+ } while (bytesRead != 0);
if (bufferStartsWithTsSyncByte(buffer)) {
// If we don't see a stream in the program table after fetching a full ts segment
@@ -1092,7 +1290,7 @@ void PlaylistFetcher::onDownloadNext() {
return;
}
- err = OK;
+ status_t err = OK;
if (tsBuffer != NULL) {
AString method;
CHECK(buffer->meta()->findString("cipher-method", &method));
@@ -1125,11 +1323,11 @@ void PlaylistFetcher::onDownloadNext() {
}
++mSeqNumber;
-
postMonitorQueue();
}
-int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const {
+int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(
+ int64_t anchorTimeUs, int64_t targetDurationUs) const {
int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
if (mPlaylist->meta() == NULL
|| !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
@@ -1138,7 +1336,8 @@ int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const
lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
- while (index >= 0 && anchorTimeUs > mStartTimeUs) {
+ // adjust anchorTimeUs to within 1x targetDurationUs from mStartTimeUs
+ while (index >= 0 && anchorTimeUs - mStartTimeUs > targetDurationUs) {
sp<AMessage> itemMeta;
CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
@@ -1324,6 +1523,9 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
int64_t timeUs;
CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
+ if (mSegmentFirstPTS < 0ll) {
+ mSegmentFirstPTS = timeUs;
+ }
if (mStartup) {
if (!mFirstPTSValid) {
mFirstTimeUs = timeUs;
@@ -1336,21 +1538,26 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
}
}
- if (timeUs < mStartTimeUs || (isAvc && !mIDRFound)) {
- // buffer up to the closest preceding IDR frame
- ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
- timeUs, mStartTimeUs);
+ bool seeking = mSeekMode == LiveSession::kSeekModeExactPosition;
+ bool startTimeReached =
+ seeking ? (timeUs >= mStartTimeUs)
+ : (timeUs > mStartTimeUs);
+
+ if (!startTimeReached || (isAvc && !mIDRFound)) {
+ // buffer up to the closest preceding IDR frame in the next segement,
+ // or the closest succeeding IDR frame after the exact position
if (isAvc) {
- if (IsIDR(accessUnit)) {
+ if (IsIDR(accessUnit) && (seeking || startTimeReached)) {
mVideoBuffer->clear();
mIDRFound = true;
}
- if (mIDRFound) {
+ if (mIDRFound && seeking && !startTimeReached) {
mVideoBuffer->queueAccessUnit(accessUnit);
}
}
-
- continue;
+ if (!startTimeReached || (isAvc && !mIDRFound)) {
+ continue;
+ }
}
}
@@ -1387,7 +1594,8 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
// live stream; re-adjust based on the actual timestamp extracted from the
// media segment; if we didn't move backward after the re-adjustment
// (newSeqNumber), start at least 1 segment prior.
- int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
+ int32_t newSeqNumber = getSeqNumberWithAnchorTime(
+ timeUs, targetDurationUs);
if (newSeqNumber >= mSeqNumber) {
--mSeqNumber;
} else {
@@ -1395,6 +1603,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
}
mStartTimeUsNotify = mNotify->dup();
mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
+ mIDRFound = false;
return -EAGAIN;
}
@@ -1413,7 +1622,11 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
if (streamMask == mStreamTypeMask) {
mStartup = false;
- mStartTimeUsNotify->post();
+ // only need to post if we're switching and searching for a
+ // start point in next segment, or next IDR
+ if (mSeekMode != LiveSession::kSeekModeExactPosition) {
+ mStartTimeUsNotify->post();
+ }
mStartTimeUsNotify.clear();
}
}
@@ -1428,7 +1641,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
|| !mStopParams->findInt64(key, &stopTimeUs)
|| (discontinuitySeq == mDiscontinuitySeq
&& timeUs >= stopTimeUs)) {
- packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
mStreamTypeMask &= ~stream;
mPacketSources.removeItemsAt(i);
break;
@@ -1678,7 +1890,8 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
// Duplicated logic from how we handle .ts playlists.
if (mStartup && mSegmentStartTimeUs >= 0
&& timeUs - mStartTimeUs > targetDurationUs) {
- int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
+ int32_t newSeqNumber = getSeqNumberWithAnchorTime(
+ timeUs, targetDurationUs);
if (newSeqNumber >= mSeqNumber) {
--mSeqNumber;
} else {
@@ -1704,7 +1917,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
|| discontinuitySeq > mDiscontinuitySeq
|| !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
|| (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
- packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
mStreamTypeMask = 0;
mPacketSources.clear();
return ERROR_OUT_OF_RANGE;
diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h
index b82e50d..8d34cbc 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.h
+++ b/media/libstagefright/httplive/PlaylistFetcher.h
@@ -65,9 +65,9 @@ struct PlaylistFetcher : public AHandler {
int64_t segmentStartTimeUs = -1ll, // starting position within playlist
// startTimeUs!=segmentStartTimeUs only when playlist is live
int32_t startDiscontinuitySeq = 0,
- bool adaptive = false);
+ LiveSession::SeekMode seekMode = LiveSession::kSeekModeExactPosition);
- void pauseAsync(bool immediate = false);
+ void pauseAsync(float thresholdRatio);
void stopAsync(bool clear = true);
@@ -95,6 +95,8 @@ private:
kWhatDownloadNext = 'dlnx',
};
+ struct DownloadState;
+
static const int64_t kMaxMonitorDelayUs;
static const int32_t kNumSkipFrames;
@@ -105,6 +107,7 @@ private:
sp<AMessage> mNotify;
sp<AMessage> mStartTimeUsNotify;
+ sp<HTTPBase> mHTTPDataSource;
sp<LiveSession> mSession;
AString mURI;
@@ -131,7 +134,7 @@ private:
int32_t mNumRetries;
bool mStartup;
bool mIDRFound;
- bool mAdaptive;
+ int32_t mSeekMode;
bool mPrepared;
bool mTimeChangeSignaled;
int64_t mNextPTSTimeUs;
@@ -141,9 +144,6 @@ private:
int32_t mLastDiscontinuitySeq;
- Mutex mStoppingLock;
- bool mStopping;
-
enum RefreshState {
INITIAL_MINIMUM_RELOAD_DELAY,
FIRST_UNCHANGED_RELOAD_ATTEMPT,
@@ -157,8 +157,8 @@ private:
sp<ATSParser> mTSParser;
bool mFirstPTSValid;
- uint64_t mFirstPTS;
int64_t mFirstTimeUs;
+ int64_t mSegmentFirstPTS;
sp<AnotherPacketSource> mVideoBuffer;
// Stores the initialization vector to decrypt the next block of cipher text, which can
@@ -166,6 +166,11 @@ private:
// the last block of cipher text (cipher-block chaining).
unsigned char mAESInitVec[16];
+ Mutex mThresholdLock;
+ float mThresholdRatio;
+
+ sp<DownloadState> mDownloadState;
+
// Set first to true if decrypting the first segment of a playlist segment. When
// first is true, reset the initialization vector based on the available
// information in the manifest; otherwise, use the initialization vector as
@@ -181,7 +186,8 @@ private:
void postMonitorQueue(int64_t delayUs = 0, int64_t minDelayUs = 0);
void cancelMonitorQueue();
- void setStopping(bool stopping);
+ void setStoppingThreshold(float thresholdRatio);
+ bool shouldPauseDownload(bool startFound);
int64_t delayUsToRefreshPlaylist() const;
status_t refreshPlaylist();
@@ -195,6 +201,11 @@ private:
void onStop(const sp<AMessage> &msg);
void onMonitorQueue();
void onDownloadNext();
+ bool initDownloadState(
+ AString &uri,
+ sp<AMessage> &itemMeta,
+ int32_t &firstSeqNumberInPlaylist,
+ int32_t &lastSeqNumberInPlaylist);
// Resume a fetcher to continue until the stopping point stored in msg.
status_t onResumeUntil(const sp<AMessage> &msg);
@@ -213,7 +224,8 @@ private:
void queueDiscontinuity(
ATSParser::DiscontinuityType type, const sp<AMessage> &extra);
- int32_t getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const;
+ int32_t getSeqNumberWithAnchorTime(
+ int64_t anchorTimeUs, int64_t targetDurationUs) const;
int32_t getSeqNumberForDiscontinuity(size_t discontinuitySeq) const;
int32_t getSeqNumberForTime(int64_t timeUs) const;