summaryrefslogtreecommitdiffstats
path: root/media/libstagefright/httplive/LiveSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'media/libstagefright/httplive/LiveSession.cpp')
-rw-r--r--media/libstagefright/httplive/LiveSession.cpp412
1 files changed, 239 insertions, 173 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index f5328a6..738f8b6 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -128,6 +128,7 @@ LiveSession::LiveSession(
mInPreparationPhase(true),
mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
mCurBandwidthIndex(-1),
+ mLastBandwidthBps(-1ll),
mBandwidthEstimator(new BandwidthEstimator()),
mStreamMask(0),
mNewStreamMask(0),
@@ -159,24 +160,6 @@ LiveSession::~LiveSession() {
}
}
-sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
- ABuffer *discontinuity = new ABuffer(0);
- discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
- discontinuity->meta()->setInt32("swapPacketSource", swap);
- discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
- discontinuity->meta()->setInt64("timeUs", -1);
- return discontinuity;
-}
-
-void LiveSession::swapPacketSource(StreamType stream) {
- sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
- sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
- sp<AnotherPacketSource> tmp = aps;
- aps = aps2;
- aps2 = tmp;
- aps2->clear();
-}
-
status_t LiveSession::dequeueAccessUnit(
StreamType stream, sp<ABuffer> *accessUnit) {
if (!(mStreamMask & stream)) {
@@ -189,7 +172,13 @@ status_t LiveSession::dequeueAccessUnit(
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
ssize_t idx = typeToIndex(stream);
- if (!packetSource->hasBufferAvailable(&finalResult)) {
+ // Do not let client pull data if we don't have data packets yet.
+ // We might only have a format discontinuity queued without data.
+ // When NuPlayerDecoder dequeues the format discontinuity, it will
+ // immediately try to getFormat. If we return NULL, NuPlayerDecoder
+ // thinks it can do seamless change, so will not shutdown decoder.
+ // When the actual format arrives, it can't handle it and get stuck.
+ if (!packetSource->hasDataBufferAvailable(&finalResult)) {
if (finalResult == OK) {
return -EAGAIN;
} else {
@@ -197,49 +186,8 @@ status_t LiveSession::dequeueAccessUnit(
}
}
- // Do not let client pull data if we don't have format yet.
- // We might only have a format discontinuity queued without actual data.
- // When NuPlayerDecoder dequeues the format discontinuity, it will
- // immediately try to getFormat. If we return NULL, NuPlayerDecoder
- // thinks it can do seamless change, so will not shutdown decoder.
- // When the actual format arrives, it can't handle it and get stuck.
- // TODO: We need a method to check if the packet source has any
- // data packets available, dequeuing should only start then.
- sp<MetaData> format = packetSource->getFormat();
- if (format == NULL) {
- return -EAGAIN;
- }
- int32_t targetDuration = 0;
- sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
- if (meta != NULL) {
- meta->findInt32("targetDuration", &targetDuration);
- }
-
- int64_t targetDurationUs = targetDuration * 1000000ll;
- if (targetDurationUs == 0 ||
- targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
- // Fetchers limit buffering to
- // min(3 * targetDuration, kMinBufferedDurationUs)
- targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
- }
-
- // wait for counterpart
- sp<AnotherPacketSource> otherSource;
- uint32_t mask = mNewStreamMask & mStreamMask;
- uint32_t fetchersMask = 0;
- for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
- uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
- fetchersMask |= fetcherMask;
- }
- mask &= fetchersMask;
- if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
- otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
- } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
- otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
- }
- if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
- return finalResult == OK ? -EAGAIN : finalResult;
- }
+ // Let the client dequeue as long as we have buffers available
+ // Do not make pause/resume decisions here.
status_t err = packetSource->dequeueAccessUnit(accessUnit);
@@ -278,41 +226,25 @@ status_t LiveSession::dequeueAccessUnit(
type,
extra == NULL ? "NULL" : extra->debugString().c_str());
- int32_t swap;
- if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
- int32_t switchGeneration;
- CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
- {
- Mutex::Autolock lock(mSwapMutex);
- if (switchGeneration == mSwitchGeneration) {
- swapPacketSource(stream);
- sp<AMessage> msg = new AMessage(kWhatSwapped, this);
- msg->setInt32("stream", stream);
- msg->setInt32("switchGeneration", switchGeneration);
- msg->post();
- }
- }
+ size_t seq = strm.mCurDiscontinuitySeq;
+ int64_t offsetTimeUs;
+ if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
+ offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
} else {
- size_t seq = strm.mCurDiscontinuitySeq;
- int64_t offsetTimeUs;
- if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
- offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
- } else {
- offsetTimeUs = 0;
- }
-
- seq += 1;
- if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
- int64_t firstTimeUs;
- firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
- offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
- offsetTimeUs += strm.mLastSampleDurationUs;
- } else {
- offsetTimeUs += strm.mLastSampleDurationUs;
- }
+ offsetTimeUs = 0;
+ }
- mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
+ seq += 1;
+ if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
+ int64_t firstTimeUs;
+ firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
+ offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
+ offsetTimeUs += strm.mLastSampleDurationUs;
+ } else {
+ offsetTimeUs += strm.mLastSampleDurationUs;
}
+
+ mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
} else if (err == OK) {
if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
@@ -373,7 +305,6 @@ status_t LiveSession::dequeueAccessUnit(
}
status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
- // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
if (!(mStreamMask & stream)) {
return UNKNOWN_ERROR;
}
@@ -389,6 +320,10 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
return convertMetaDataToMessage(meta, format);
}
+sp<HTTPBase> LiveSession::getHTTPDataSource() {
+ return new MediaHTTP(mHTTPService->makeHTTPConnection());
+}
+
void LiveSession::connectAsync(
const char *url, const KeyedVector<String8, String8> *headers) {
sp<AMessage> msg = new AMessage(kWhatConnect, this);
@@ -468,21 +403,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
case PlaylistFetcher::kWhatPaused:
case PlaylistFetcher::kWhatStopped:
{
+ AString uri;
+ CHECK(msg->findString("uri", &uri));
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0) {
+ // ignore msgs from fetchers that's already gone
+ break;
+ }
+
if (what == PlaylistFetcher::kWhatStopped) {
- AString uri;
- CHECK(msg->findString("uri", &uri));
- ssize_t index = mFetcherInfos.indexOfKey(uri);
- if (index < 0) {
- // ignore duplicated kWhatStopped messages.
- break;
- }
+ tryToFinishBandwidthSwitch(uri);
mFetcherLooper->unregisterHandler(
mFetcherInfos[index].mFetcher->id());
mFetcherInfos.removeItemsAt(index);
-
- if (mSwitchInProgress) {
- tryToFinishBandwidthSwitch();
+ } else if (what == PlaylistFetcher::kWhatPaused) {
+ int32_t seekMode;
+ CHECK(msg->findInt32("seekMode", &seekMode));
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if (mStreams[i].mUri == uri) {
+ mStreams[i].mSeekMode = (SeekMode) seekMode;
+ }
}
}
@@ -564,6 +505,13 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ AString uri;
+ CHECK(msg->findString("uri", &uri));
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index >= 0) {
+ mFetcherInfos.editValueAt(index).mToBeResumed = true;
+ }
+
// Resume fetcher for the original variant; the resumed fetcher should
// continue until the timestamps found in msg, which is stored by the
// new fetcher to indicate where the new variant has started buffering.
@@ -607,12 +555,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
- case kWhatSwapped:
- {
- onSwapped(msg);
- break;
- }
-
case kWhatPollBuffering:
{
int32_t generation;
@@ -751,11 +693,10 @@ void LiveSession::onConnect(const sp<AMessage> &msg) {
}
void LiveSession::finishDisconnect() {
+ ALOGV("finishDisconnect");
+
// No reconfiguration is currently pending, make sure none will trigger
// during disconnection either.
-
- // Protect mPacketSources from a swapPacketSource race condition through disconnect.
- // (finishDisconnect, onFinishDisconnect2)
cancelBandwidthSwitch();
// cancel buffer polling
@@ -805,8 +746,8 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
FetcherInfo info;
info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
info.mDurationUs = -1ll;
- info.mIsPrepared = false;
info.mToBeRemoved = false;
+ info.mToBeResumed = false;
mFetcherLooper->registerHandler(info.mFetcher);
mFetcherInfos.add(uri, info);
@@ -835,14 +776,15 @@ ssize_t LiveSession::fetchFile(
int64_t range_offset, int64_t range_length,
uint32_t block_size, /* download block size */
sp<DataSource> *source, /* to return and reuse source */
- String8 *actualUrl) {
+ String8 *actualUrl,
+ bool forceConnectHTTP /* force connect HTTP when resuing source */) {
off64_t size;
sp<DataSource> temp_source;
if (source == NULL) {
source = &temp_source;
}
- if (*source == NULL) {
+ if (*source == NULL || forceConnectHTTP) {
if (!strncasecmp(url, "file://", 7)) {
*source = new FileSource(url + 7);
} else if (strncasecmp(url, "http://", 7)
@@ -861,13 +803,18 @@ ssize_t LiveSession::fetchFile(
? "" : AStringPrintf("%lld",
range_offset + range_length - 1).c_str()).c_str()));
}
- status_t err = mHTTPDataSource->connect(url, &headers);
+
+ HTTPBase* httpDataSource =
+ (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get();
+ status_t err = httpDataSource->connect(url, &headers);
if (err != OK) {
return err;
}
- *source = mHTTPDataSource;
+ if (*source == NULL) {
+ *source = mHTTPDataSource;
+ }
}
}
@@ -1003,6 +950,57 @@ static double uniformRand() {
}
#endif
+float LiveSession::getAbortThreshold(
+ ssize_t currentBWIndex, ssize_t targetBWIndex) const {
+ float abortThreshold = -1.0f;
+ if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
+ /*
+ If we're switching down, we need to decide whether to
+
+ 1) finish last segment of high-bandwidth variant, or
+ 2) abort last segment of high-bandwidth variant, and fetch an
+ overlapping portion from low-bandwidth variant.
+
+ Here we try to maximize the amount of buffer left when the
+ switch point is met. Given the following parameters:
+
+ B: our current buffering level in seconds
+ T: target duration in seconds
+ X: sample duration in seconds remain to fetch in last segment
+ bw0: bandwidth of old variant (as specified in playlist)
+ bw1: bandwidth of new variant (as specified in playlist)
+ bw: measured bandwidth available
+
+ If we choose 1), when switch happens at the end of current
+ segment, our buffering will be
+ B + X - X * bw0 / bw
+
+ If we choose 2), when switch happens where we aborted current
+ segment, our buffering will be
+ B - (T - X) * bw1 / bw
+
+ We should only choose 1) if
+ X/T < bw1 / (bw1 + bw0 - bw)
+ */
+
+ CHECK(mLastBandwidthBps >= 0);
+ abortThreshold =
+ (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
+ / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
+ + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
+ - (float)mLastBandwidthBps * 0.7f);
+ if (abortThreshold < 0.0f) {
+ abortThreshold = -1.0f; // do not abort
+ }
+ ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
+ mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
+ mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
+ mLastBandwidthBps,
+ abortThreshold);
+ }
+ return abortThreshold;
+}
+
void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
}
@@ -1210,8 +1208,6 @@ void LiveSession::changeConfiguration(
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
- mCurBandwidthIndex = bandwidthIndex;
-
ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
timeUs, bandwidthIndex, pickTrack);
@@ -1238,7 +1234,6 @@ void LiveSession::changeConfiguration(
if (timeUs < 0ll) {
// delay fetcher removal if not picking tracks
discardFetcher = pickTrack;
-
}
for (size_t j = 0; j < kMaxStreams; ++j) {
@@ -1253,12 +1248,24 @@ void LiveSession::changeConfiguration(
if (discardFetcher) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
} else {
- // if we're seeking, pause immediately (no need to finish the segment)
- bool immediate = (timeUs >= 0ll);
- mFetcherInfos.valueAt(i).mFetcher->pauseAsync(immediate);
+ float threshold = -1.0f; // always finish fetching by default
+ if (timeUs >= 0ll) {
+ // seeking, no need to finish fetching
+ threshold = 0.0f;
+ } else if (!pickTrack) {
+ // adapting, abort if remaining of current segment is over threshold
+ threshold = getAbortThreshold(
+ mCurBandwidthIndex, bandwidthIndex);
+ }
+
+ ALOGV("Pausing with threshold %.3f", threshold);
+
+ mFetcherInfos.valueAt(i).mFetcher->pauseAsync(threshold);
}
}
+ mCurBandwidthIndex = bandwidthIndex;
+
sp<AMessage> msg;
if (timeUs < 0ll) {
// skip onChangeConfiguration2 (decoder destruction) if not seeking.
@@ -1451,7 +1458,6 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
}
// streamMask now only contains the types that need a new fetcher created.
-
if (streamMask != 0) {
ALOGV("creating new fetchers for mask 0x%08x", streamMask);
}
@@ -1472,6 +1478,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
int64_t startTimeUs = -1;
int64_t segmentStartTimeUs = -1ll;
int32_t discontinuitySeq = -1;
+ SeekMode seekMode = kSeekModeExactPosition;
sp<AnotherPacketSource> sources[kMaxStreams];
if (i == kSubtitleIndex) {
@@ -1491,6 +1498,16 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
sp<AMessage> meta;
if (pickTrack) {
// selecting
+
+ // FIXME:
+ // This should only apply to the track that's being picked, we
+ // need a bitmask to indicate that.
+ //
+ // It's possible that selectTrack() gets called during a bandwidth
+ // switch, and we needed to fetch a new variant. The new fetcher
+ // should start from where old fetcher left off, not where decoder
+ // is dequeueing at.
+
meta = sources[j]->getLatestDequeuedMeta();
} else {
// adapting
@@ -1527,14 +1544,22 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
ALOGV("stream[%zu]: queue format change", j);
sources[j]->queueDiscontinuity(
- ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
+ ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
} else {
// adapting, queue discontinuities after resume
sources[j] = mPacketSources2.valueFor(indexToType(j));
sources[j]->clear();
uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
if (extraStreams & indexToType(j)) {
- sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
+ sources[j]->queueDiscontinuity(
+ ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
+ }
+ // the new fetcher might be providing streams that used to be
+ // provided by two different fetchers, if one of the fetcher
+ // paused in the middle while the other somehow paused in next
+ // seg, we have to start from next seg.
+ if (seekMode < mStreams[j].mSeekMode) {
+ seekMode = mStreams[j].mSeekMode;
}
}
}
@@ -1550,7 +1575,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
segmentStartTimeUs,
discontinuitySeq,
- switching);
+ seekMode);
}
// All fetchers have now been started, the configuration change
@@ -1569,25 +1594,57 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
}
}
-void LiveSession::onSwapped(const sp<AMessage> &msg) {
- int32_t switchGeneration;
- CHECK(msg->findInt32("switchGeneration", &switchGeneration));
- if (switchGeneration != mSwitchGeneration) {
+void LiveSession::swapPacketSource(StreamType stream) {
+ ALOGV("swapPacketSource: stream = %d", stream);
+
+ // transfer packets from source2 to source
+ sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
+ sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
+
+ // queue discontinuity in mPacketSource
+ aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);
+
+ // queue packets in mPacketSource2 to mPacketSource
+ status_t finalResult = OK;
+ sp<ABuffer> accessUnit;
+ while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
+ OK == aps2->dequeueAccessUnit(&accessUnit)) {
+ aps->queueAccessUnit(accessUnit);
+ }
+ aps2->clear();
+}
+
+void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
+ if (!mSwitchInProgress) {
+ return;
+ }
+
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
return;
}
- int32_t stream;
- CHECK(msg->findInt32("stream", &stream));
+ // Swap packet source of streams provided by old variant
+ for (size_t idx = 0; idx < kMaxStreams; idx++) {
+ if (uri == mStreams[idx].mUri) {
+ StreamType stream = indexToType(idx);
- ssize_t idx = typeToIndex(stream);
- CHECK(idx >= 0);
- if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
- ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
+ swapPacketSource(stream);
+
+ if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
+ ALOGW("swapping stream type %d %s to empty stream",
+ stream, mStreams[idx].mUri.c_str());
+ }
+ mStreams[idx].mUri = mStreams[idx].mNewUri;
+ mStreams[idx].mNewUri.clear();
+
+ mSwapMask &= ~stream;
+ }
}
- mStreams[idx].mUri = mStreams[idx].mNewUri;
- mStreams[idx].mNewUri.clear();
- mSwapMask &= ~stream;
+ mFetcherInfos.editValueAt(index).mToBeRemoved = false;
+
+ ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask);
if (mSwapMask != 0) {
return;
}
@@ -1595,21 +1652,50 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) {
// Check if new variant contains extra streams.
uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
while (extraStreams) {
- StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
- swapPacketSource(extraStream);
- extraStreams &= ~extraStream;
+ StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
+ extraStreams &= ~stream;
+
+ swapPacketSource(stream);
- idx = typeToIndex(extraStream);
+ ssize_t idx = typeToIndex(stream);
CHECK(idx >= 0);
if (mStreams[idx].mNewUri.empty()) {
ALOGW("swapping extra stream type %d %s to empty stream",
- extraStream, mStreams[idx].mUri.c_str());
+ stream, mStreams[idx].mUri.c_str());
}
mStreams[idx].mUri = mStreams[idx].mNewUri;
mStreams[idx].mNewUri.clear();
}
- tryToFinishBandwidthSwitch();
+ // Restart new fetcher (it was paused after the first 47k block)
+ // and let it fetch into mPacketSources (not mPacketSources2)
+ for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
+ FetcherInfo &info = mFetcherInfos.editValueAt(i);
+ if (info.mToBeResumed) {
+ const AString &uri = mFetcherInfos.keyAt(i);
+ sp<AnotherPacketSource> sources[kMaxStreams];
+ for (size_t j = 0; j < kMaxStreams; ++j) {
+ if (uri == mStreams[j].mUri) {
+ sources[j] = mPacketSources.valueFor(indexToType(j));
+ }
+ }
+ if (sources[kAudioIndex] != NULL
+ || sources[kVideoIndex] != NULL
+ || sources[kSubtitleIndex] != NULL) {
+ ALOGV("resuming fetcher %s", uri.c_str());
+ info.mFetcher->startAsync(
+ sources[kAudioIndex],
+ sources[kVideoIndex],
+ sources[kSubtitleIndex]);
+ }
+ info.mToBeResumed = false;
+ }
+ }
+
+ mStreamMask = mNewStreamMask;
+ mSwitchInProgress = false;
+
+ ALOGI("#### Finished Bandwidth Switch");
}
void LiveSession::schedulePollBuffering() {
@@ -1624,7 +1710,7 @@ void LiveSession::cancelPollBuffering() {
void LiveSession::onPollBuffering() {
ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
- "mInPreparationPhase %d, mCurBandwidthIndex %d, mStreamMask 0x%x",
+ "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
mSwitchInProgress, mReconfigurationInProgress,
mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
@@ -1643,30 +1729,9 @@ void LiveSession::onPollBuffering() {
schedulePollBuffering();
}
-// Mark switch done when:
-// 1. all old buffers are swapped out
-void LiveSession::tryToFinishBandwidthSwitch() {
- if (!mSwitchInProgress) {
- return;
- }
-
- bool needToRemoveFetchers = false;
- for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
- if (mFetcherInfos.valueAt(i).mToBeRemoved) {
- needToRemoveFetchers = true;
- break;
- }
- }
-
- if (!needToRemoveFetchers && mSwapMask == 0) {
- ALOGI("mSwitchInProgress = false");
- mStreamMask = mNewStreamMask;
- mSwitchInProgress = false;
- }
-}
-
void LiveSession::cancelBandwidthSwitch() {
- Mutex::Autolock lock(mSwapMutex);
+ ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++", mSwitchGeneration);
+
mSwitchGeneration++;
mSwitchInProgress = false;
mSwapMask = 0;
@@ -1760,6 +1825,7 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
int32_t bandwidthBps;
if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) {
ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
+ mLastBandwidthBps = bandwidthBps;
} else {
ALOGV("no bandwidth estimate.");
return;
@@ -1778,7 +1844,7 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
return;
}
- ALOGI("#### Initiate Bandwidth Switch: %d => %d",
+ ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
mCurBandwidthIndex, bandwidthIndex);
changeConfiguration(-1, bandwidthIndex, false);
}