summaryrefslogtreecommitdiffstats
path: root/media/libstagefright/httplive/LiveSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'media/libstagefright/httplive/LiveSession.cpp')
-rw-r--r--media/libstagefright/httplive/LiveSession.cpp666
1 files changed, 505 insertions, 161 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index 738f8b6..4ac2fb2 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -33,6 +33,7 @@
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/foundation/AUtils.h>
#include <media/stagefright/DataSource.h>
#include <media/stagefright/FileSource.h>
#include <media/stagefright/MediaErrors.h>
@@ -49,12 +50,6 @@
namespace android {
-// static
-// High water mark to start up switch or report prepared)
-const int64_t LiveSession::kHighWaterMark = 8000000ll;
-const int64_t LiveSession::kMidWaterMark = 5000000ll;
-const int64_t LiveSession::kLowWaterMark = 3000000ll;
-
struct LiveSession::BandwidthEstimator : public RefBase {
BandwidthEstimator();
@@ -119,15 +114,34 @@ bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) {
return true;
}
+//static
+const char *LiveSession::getKeyForStream(StreamType type) {
+ switch (type) {
+ case STREAMTYPE_VIDEO:
+ return "timeUsVideo";
+ case STREAMTYPE_AUDIO:
+ return "timeUsAudio";
+ case STREAMTYPE_SUBTITLES:
+ return "timeUsSubtitle";
+ default:
+ TRESPASS();
+ }
+ return NULL;
+}
+
LiveSession::LiveSession(
const sp<AMessage> &notify, uint32_t flags,
const sp<IMediaHTTPService> &httpService)
: mNotify(notify),
mFlags(flags),
mHTTPService(httpService),
+ mBuffering(false),
mInPreparationPhase(true),
+ mPollBufferingGeneration(0),
+ mPrevBufferPercentage(-1),
mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
mCurBandwidthIndex(-1),
+ mOrigBandwidthIndex(-1),
mLastBandwidthBps(-1ll),
mBandwidthEstimator(new BandwidthEstimator()),
mStreamMask(0),
@@ -139,11 +153,12 @@ LiveSession::LiveSession(
mRealTimeBaseUs(0ll),
mReconfigurationInProgress(false),
mSwitchInProgress(false),
+ mUpSwitchMark(kUpSwitchMark),
+ mDownSwitchMark(kDownSwitchMark),
+ mUpSwitchMargin(kUpSwitchMargin),
mFirstTimeUsValid(false),
mFirstTimeUs(0),
- mLastSeekTimeUs(0),
- mPollBufferingGeneration(0) {
-
+ mLastSeekTimeUs(0) {
mStreams[kAudioIndex] = StreamItem("audio");
mStreams[kVideoIndex] = StreamItem("video");
mStreams[kSubtitleIndex] = StreamItem("subtitles");
@@ -162,12 +177,6 @@ LiveSession::~LiveSession() {
status_t LiveSession::dequeueAccessUnit(
StreamType stream, sp<ABuffer> *accessUnit) {
- if (!(mStreamMask & stream)) {
- // return -EWOULDBLOCK to avoid halting the decoder
- // when switching between audio/video and audio only.
- return -EWOULDBLOCK;
- }
-
status_t finalResult = OK;
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
@@ -225,26 +234,6 @@ status_t LiveSession::dequeueAccessUnit(
streamStr,
type,
extra == NULL ? "NULL" : extra->debugString().c_str());
-
- size_t seq = strm.mCurDiscontinuitySeq;
- int64_t offsetTimeUs;
- if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
- offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
- } else {
- offsetTimeUs = 0;
- }
-
- seq += 1;
- if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
- int64_t firstTimeUs;
- firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
- offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
- offsetTimeUs += strm.mLastSampleDurationUs;
- } else {
- offsetTimeUs += strm.mLastSampleDurationUs;
- }
-
- mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
} else if (err == OK) {
if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
@@ -252,7 +241,26 @@ status_t LiveSession::dequeueAccessUnit(
int32_t discontinuitySeq = 0;
CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
(*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
- strm.mCurDiscontinuitySeq = discontinuitySeq;
+ if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
+ int64_t offsetTimeUs;
+ if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
+ offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
+ } else {
+ offsetTimeUs = 0;
+ }
+
+ if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
+ int64_t firstTimeUs;
+ firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
+ offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
+ offsetTimeUs += strm.mLastSampleDurationUs;
+ } else {
+ offsetTimeUs += strm.mLastSampleDurationUs;
+ }
+
+ mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
+ strm.mCurDiscontinuitySeq = discontinuitySeq;
+ }
int32_t discard = 0;
int64_t firstTimeUs;
@@ -317,6 +325,11 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
return -EAGAIN;
}
+ if (stream == STREAMTYPE_AUDIO) {
+ // set AAC input buffer size to 32K bytes (256kbps x 1sec)
+ meta->setInt32(kKeyMaxInputSize, 32 * 1024);
+ }
+
return convertMetaDataToMessage(meta, format);
}
@@ -357,6 +370,102 @@ status_t LiveSession::seekTo(int64_t timeUs) {
return err;
}
+bool LiveSession::checkSwitchProgress(
+ sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
+ AString newUri;
+ CHECK(stopParams->findString("uri", &newUri));
+
+ *needResumeUntil = false;
+ sp<AMessage> firstNewMeta[kMaxStreams];
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ StreamType stream = indexToType(i);
+ if (!(mSwapMask & mNewStreamMask & stream)
+ || (mStreams[i].mNewUri != newUri)) {
+ continue;
+ }
+ if (stream == STREAMTYPE_SUBTITLES) {
+ continue;
+ }
+ sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
+
+ // First, get latest dequeued meta, which is where the decoder is at.
+ // (when upswitching, we take the meta after a certain delay, so that
+ // the decoder is left with some cushion)
+ sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
+ if (delayUs > 0) {
+ lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
+ } else {
+ lastDequeueMeta = source->getLatestDequeuedMeta();
+ }
+ // Then, trim off packets at beginning of mPacketSources2 that's before
+ // the latest dequeued time. These samples are definitely too late.
+ int64_t lastTimeUs, startTimeUs;
+ int32_t lastSeq, startSeq;
+ if (lastDequeueMeta != NULL) {
+ CHECK(lastDequeueMeta->findInt64("timeUs", &lastTimeUs));
+ CHECK(lastDequeueMeta->findInt32("discontinuitySeq", &lastSeq));
+ firstNewMeta[i] = mPacketSources2.editValueAt(i)
+ ->trimBuffersBeforeTimeUs(lastSeq, lastTimeUs);
+ }
+ // Now firstNewMeta[i] is the first sample after the trim.
+ // If it's NULL, we failed because dequeue already past all samples
+ // in mPacketSource2, we have to try again.
+ if (firstNewMeta[i] == NULL) {
+ ALOGV("[%s] dequeue time (%d, %lld) past start time",
+ stream == STREAMTYPE_AUDIO ? "audio" : "video",
+ lastSeq, (long long) lastTimeUs);
+ return false;
+ }
+
+ // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
+ // already fetched, and see if we need to resumeUntil
+ lastEnqueueMeta = source->getLatestEnqueuedMeta();
+ // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
+ // boundary, no need to resume as the content will look different anyways
+ if (lastEnqueueMeta != NULL) {
+ CHECK(lastEnqueueMeta->findInt64("timeUs", &lastTimeUs));
+ CHECK(lastEnqueueMeta->findInt32("discontinuitySeq", &lastSeq));
+ CHECK(firstNewMeta[i]->findInt64("timeUs", &startTimeUs));
+ CHECK(firstNewMeta[i]->findInt32("discontinuitySeq", &startSeq));
+
+ // no need to resume old fetcher if new fetcher started in different
+ // discontinuity sequence, as the content will look different.
+ *needResumeUntil |=
+ (startSeq == lastSeq
+ && startTimeUs - lastTimeUs > 100000ll);
+
+ // update the stopTime for resumeUntil, as we might have removed some
+ // packets from the head in mPacketSource2
+ stopParams->setInt64(getKeyForStream(stream), startTimeUs);
+ }
+ }
+
+ // if we're here, it means dequeue progress hasn't passed some samples in
+ // mPacketSource2, we can trim off the excess in mPacketSource.
+ // (old fetcher might still need to resumeUntil the start time of new fetcher)
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ StreamType stream = indexToType(i);
+ if (!(mSwapMask & mNewStreamMask & stream)
+ || (newUri != mStreams[i].mNewUri)) {
+ continue;
+ }
+ if (stream == STREAMTYPE_SUBTITLES) {
+ continue;
+ }
+ int64_t startTimeUs;
+ int32_t startSeq;
+ CHECK(firstNewMeta[i] != NULL);
+ CHECK(firstNewMeta[i]->findInt64("timeUs", &startTimeUs));
+ CHECK(firstNewMeta[i]->findInt32("discontinuitySeq", &startSeq));
+ mPacketSources.valueFor(stream)->trimBuffersAfterTimeUs(startSeq, startTimeUs);
+ }
+
+ // no resumeUntil if already underflow
+ *needResumeUntil &= !mBuffering;
+
+ return true;
+}
+
void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatConnect:
@@ -412,8 +521,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
}
if (what == PlaylistFetcher::kWhatStopped) {
- tryToFinishBandwidthSwitch(uri);
-
mFetcherLooper->unregisterHandler(
mFetcherInfos[index].mFetcher->id());
mFetcherInfos.removeItemsAt(index);
@@ -452,6 +559,16 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ case PlaylistFetcher::kWhatTargetDurationUpdate:
+ {
+ int64_t targetDurationUs;
+ CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
+ mUpSwitchMark = min(kUpSwitchMark, targetDurationUs * 3);
+ mDownSwitchMark = min(kDownSwitchMark, targetDurationUs * 9 / 4);
+ mUpSwitchMargin = min(kUpSwitchMargin, targetDurationUs);
+ break;
+ }
+
case PlaylistFetcher::kWhatError:
{
status_t err;
@@ -489,10 +606,23 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
mPacketSources.valueFor(
STREAMTYPE_SUBTITLES)->signalEOS(err);
- sp<AMessage> notify = mNotify->dup();
- notify->setInt32("what", kWhatError);
- notify->setInt32("err", err);
- notify->post();
+ postError(err);
+ break;
+ }
+
+ case PlaylistFetcher::kWhatStopReached:
+ {
+ ALOGV("kWhatStopReached");
+
+ AString uri;
+ CHECK(msg->findString("uri", &uri));
+
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0) {
+ break;
+ }
+
+ tryToFinishBandwidthSwitch(uri);
break;
}
@@ -507,20 +637,67 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
AString uri;
CHECK(msg->findString("uri", &uri));
+
+ // mark new fetcher mToBeResumed
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index >= 0) {
mFetcherInfos.editValueAt(index).mToBeResumed = true;
}
- // Resume fetcher for the original variant; the resumed fetcher should
- // continue until the timestamps found in msg, which is stored by the
- // new fetcher to indicate where the new variant has started buffering.
- for (size_t i = 0; i < mFetcherInfos.size(); i++) {
- const FetcherInfo info = mFetcherInfos.valueAt(i);
- if (info.mToBeRemoved) {
- info.mFetcher->resumeUntilAsync(msg);
+ // temporarily disable packet sources to be swapped to prevent
+ // NuPlayerDecoder from dequeuing while we check progress
+ for (size_t i = 0; i < mPacketSources.size(); ++i) {
+ if ((mSwapMask & mPacketSources.keyAt(i))
+ && uri == mStreams[i].mNewUri) {
+ mPacketSources.editValueAt(i)->enable(false);
}
}
+ bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
+ // If switching up, require a cushion bigger than kUnderflowMark
+ // to avoid buffering immediately after the switch.
+ // (If we don't have that cushion we'd rather cancel and try again.)
+ int64_t delayUs = switchUp ? (kUnderflowMark + 1000000ll) : 0;
+ bool needResumeUntil = false;
+ sp<AMessage> stopParams = msg;
+ if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
+ // playback time hasn't passed startAt time
+ if (!needResumeUntil) {
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if ((mSwapMask & indexToType(i))
+ && uri == mStreams[i].mNewUri) {
+ // have to make a copy of mStreams[i].mUri because
+ // tryToFinishBandwidthSwitch is modifying mStreams[]
+ AString oldURI = mStreams[i].mUri;
+ tryToFinishBandwidthSwitch(oldURI);
+ break;
+ }
+ }
+ } else {
+ // startAt time is after last enqueue time
+ // Resume fetcher for the original variant; the resumed fetcher should
+ // continue until the timestamps found in msg, which is stored by the
+ // new fetcher to indicate where the new variant has started buffering.
+ for (size_t i = 0; i < mFetcherInfos.size(); i++) {
+ const FetcherInfo &info = mFetcherInfos.valueAt(i);
+ if (info.mToBeRemoved) {
+ info.mFetcher->resumeUntilAsync(stopParams);
+ }
+ }
+ }
+ } else {
+ // playback time passed startAt time
+ if (switchUp) {
+ // if switching up, cancel and retry if condition satisfies again
+ cancelBandwidthSwitch(true /* resume */);
+ } else {
+ resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
+ }
+ }
+ // re-enable all packet sources
+ for (size_t i = 0; i < mPacketSources.size(); ++i) {
+ mPacketSources.editValueAt(i)->enable(true);
+ }
+
break;
}
@@ -688,8 +865,6 @@ void LiveSession::onConnect(const sp<AMessage> &msg) {
mPlaylist->pickRandomMediaItems();
changeConfiguration(
0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
-
- schedulePollBuffering();
}
void LiveSession::finishDisconnect() {
@@ -950,6 +1125,43 @@ static double uniformRand() {
}
#endif
+bool LiveSession::resumeFetcher(
+ const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0) {
+ ALOGE("did not find fetcher for uri: %s", uri.c_str());
+ return false;
+ }
+
+ bool resume = false;
+ sp<AnotherPacketSource> sources[kMaxStreams];
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if ((streamMask & indexToType(i))
+ && ((!newUri && uri == mStreams[i].mUri)
+ || (newUri && uri == mStreams[i].mNewUri))) {
+ resume = true;
+ if (newUri) {
+ sources[i] = mPacketSources2.valueFor(indexToType(i));
+ sources[i]->clear();
+ } else {
+ sources[i] = mPacketSources.valueFor(indexToType(i));
+ }
+ }
+ }
+
+ if (resume) {
+ ALOGV("resuming fetcher %s, timeUs %lld", uri.c_str(), (long long)timeUs);
+ SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
+ mFetcherInfos.editValueAt(index).mFetcher->startAsync(
+ sources[kAudioIndex],
+ sources[kVideoIndex],
+ sources[kSubtitleIndex],
+ timeUs, -1, -1, seekMode);
+ }
+
+ return resume;
+}
+
float LiveSession::getAbortThreshold(
ssize_t currentBWIndex, ssize_t targetBWIndex) const {
float abortThreshold = -1.0f;
@@ -983,12 +1195,17 @@ float LiveSession::getAbortThreshold(
X/T < bw1 / (bw1 + bw0 - bw)
*/
+ // Taking the measured current bandwidth at 50% face value only,
+ // as our bandwidth estimation is a lagging indicator. Being
+ // conservative on this, we prefer switching to lower bandwidth
+ // unless we're really confident finishing up the last segment
+ // of higher bandwidth will be fast.
CHECK(mLastBandwidthBps >= 0);
abortThreshold =
(float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
/ ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
+ (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
- - (float)mLastBandwidthBps * 0.7f);
+ - (float)mLastBandwidthBps * 0.5f);
if (abortThreshold < 0.0f) {
abortThreshold = -1.0f; // do not abort
}
@@ -1128,7 +1345,7 @@ status_t LiveSession::onSeek(const sp<AMessage> &msg) {
CHECK(msg->findInt64("timeUs", &timeUs));
if (!mReconfigurationInProgress) {
- changeConfiguration(timeUs, mCurBandwidthIndex);
+ changeConfiguration(timeUs);
return OK;
} else {
return -EWOULDBLOCK;
@@ -1184,7 +1401,6 @@ status_t LiveSession::selectTrack(size_t index, bool select) {
status_t err = mPlaylist->selectTrack(index, select);
if (err == OK) {
sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
- msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
msg->setInt32("pickTrack", select);
msg->post();
}
@@ -1200,19 +1416,17 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
}
void LiveSession::changeConfiguration(
- int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
- // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
- // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
+ int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
cancelBandwidthSwitch();
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
-
- ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
- timeUs, bandwidthIndex, pickTrack);
-
- CHECK_LT(bandwidthIndex, mBandwidthItems.size());
- const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
+ if (bandwidthIndex >= 0) {
+ mOrigBandwidthIndex = mCurBandwidthIndex;
+ mCurBandwidthIndex = bandwidthIndex;
+ }
+ CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
+ const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
@@ -1227,6 +1441,12 @@ void LiveSession::changeConfiguration(
// Step 1, stop and discard fetchers that are no longer needed.
// Pause those that we'll reuse.
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
+ // skip fetchers that are marked mToBeRemoved,
+ // these are done and can't be reused
+ if (mFetcherInfos[i].mToBeRemoved) {
+ continue;
+ }
+
const AString &uri = mFetcherInfos.keyAt(i);
bool discardFetcher = true;
@@ -1255,7 +1475,7 @@ void LiveSession::changeConfiguration(
} else if (!pickTrack) {
// adapting, abort if remaining of current segment is over threshold
threshold = getAbortThreshold(
- mCurBandwidthIndex, bandwidthIndex);
+ mOrigBandwidthIndex, mCurBandwidthIndex);
}
ALOGV("Pausing with threshold %.3f", threshold);
@@ -1264,8 +1484,6 @@ void LiveSession::changeConfiguration(
}
}
- mCurBandwidthIndex = bandwidthIndex;
-
sp<AMessage> msg;
if (timeUs < 0ll) {
// skip onChangeConfiguration2 (decoder destruction) if not seeking.
@@ -1297,10 +1515,9 @@ void LiveSession::changeConfiguration(
void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
if (!mReconfigurationInProgress) {
- int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
+ int32_t pickTrack = 0;
msg->findInt32("pickTrack", &pickTrack);
- msg->findInt32("bandwidthIndex", &bandwidthIndex);
- changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
+ changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
} else {
msg->post(1000000ll); // retry in 1 sec
}
@@ -1323,6 +1540,10 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
mPacketSources.editValueAt(i)->clear();
}
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ mStreams[i].mCurDiscontinuitySeq = 0;
+ }
+
mDiscontinuityOffsetTimesUs.clear();
mDiscontinuityAbsStartTimesUs.clear();
@@ -1333,6 +1554,10 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
mSeekReplyID.clear();
mSeekReply.clear();
}
+
+ // restart buffer polling after seek becauese previous
+ // buffering position is no longer valid.
+ restartPollBuffering();
}
uint32_t streamMask, resumeMask;
@@ -1407,12 +1632,14 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
int64_t timeUs;
int32_t pickTrack;
bool switching = false;
+ bool finishSwitching = false;
CHECK(msg->findInt64("timeUs", &timeUs));
CHECK(msg->findInt32("pickTrack", &pickTrack));
if (timeUs < 0ll) {
if (!pickTrack) {
switching = true;
+ finishSwitching = (streamMask == 0);
}
mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
} else {
@@ -1440,20 +1667,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
const AString &uri = mFetcherInfos.keyAt(i);
-
- sp<AnotherPacketSource> sources[kMaxStreams];
- for (size_t j = 0; j < kMaxStreams; ++j) {
- if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
- sources[j] = mPacketSources.valueFor(indexToType(j));
- }
- }
- FetcherInfo &info = mFetcherInfos.editValueAt(i);
- if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
- || sources[kSubtitleIndex] != NULL) {
- info.mFetcher->startAsync(
- sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs);
- } else {
- info.mToBeRemoved = true;
+ if (!resumeFetcher(uri, resumeMask, timeUs)) {
+ mFetcherInfos.editValueAt(i).mToBeRemoved = true;
}
}
@@ -1512,24 +1727,42 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
} else {
// adapting
meta = sources[j]->getLatestEnqueuedMeta();
+ if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
+ // switching up
+ meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
+ }
}
- if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
+ if (j != kSubtitleIndex
+ && meta != NULL
+ && !meta->findInt32("discontinuity", &type)) {
int64_t tmpUs;
int64_t tmpSegmentUs;
+ int32_t seq;
CHECK(meta->findInt64("timeUs", &tmpUs));
CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs));
- if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) {
+ CHECK(meta->findInt32("discontinuitySeq", &seq));
+ // If we're switching and looking for next sample or segment, set the target
+ // segment start time to tmpSegmentUs + tmpDurationUs / 2, which is
+ // the middle point of the segment where the last sample was.
+ // This is needed if segments of the two variants are not perfectly
+ // aligned. (If the corresponding segment in new variant starts slightly
+ // later than that in the old variant, we still want the switching to
+ // start in the next one, not the current one)
+ if (mStreams[j].mSeekMode == kSeekModeNextSample
+ || mStreams[j].mSeekMode == kSeekModeNextSegment) {
+ int64_t tmpDurationUs;
+ CHECK(meta->findInt64("segmentDurationUs", &tmpDurationUs));
+ tmpSegmentUs += tmpDurationUs / 2;
+ }
+ if (startTimeUs < 0 || seq > discontinuitySeq
+ || (seq == discontinuitySeq
+ && (tmpSegmentUs > segmentStartTimeUs
+ || (tmpSegmentUs == segmentStartTimeUs
+ && tmpUs > startTimeUs)))) {
startTimeUs = tmpUs;
segmentStartTimeUs = tmpSegmentUs;
- } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) {
- startTimeUs = tmpUs;
- }
-
- int32_t seq;
- CHECK(meta->findInt32("discontinuitySeq", &seq));
- if (discontinuitySeq < 0 || seq < discontinuitySeq) {
discontinuitySeq = seq;
}
}
@@ -1585,8 +1818,21 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
mReconfigurationInProgress = false;
if (switching) {
mSwitchInProgress = true;
+
+ if (finishSwitching) {
+ // Switch is finished now, no new fetchers are created.
+ // This path is hit when old variant had video and audio from
+ // two separate fetchers, while new variant has audio only,
+ // which reuses the previous audio fetcher.
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if (mSwapMask & indexToType(i)) {
+ tryToFinishBandwidthSwitch(mStreams[i].mUri);
+ }
+ }
+ }
} else {
mStreamMask = mNewStreamMask;
+ mOrigBandwidthIndex = mCurBandwidthIndex;
}
if (mDisconnectReplyID != NULL) {
@@ -1614,21 +1860,20 @@ void LiveSession::swapPacketSource(StreamType stream) {
aps2->clear();
}
-void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
+void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
if (!mSwitchInProgress) {
return;
}
- ssize_t index = mFetcherInfos.indexOfKey(uri);
+ ssize_t index = mFetcherInfos.indexOfKey(oldUri);
if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
return;
}
// Swap packet source of streams provided by old variant
for (size_t idx = 0; idx < kMaxStreams; idx++) {
- if (uri == mStreams[idx].mUri) {
- StreamType stream = indexToType(idx);
-
+ StreamType stream = indexToType(idx);
+ if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
swapPacketSource(stream);
if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
@@ -1642,7 +1887,7 @@ void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
}
}
- mFetcherInfos.editValueAt(index).mToBeRemoved = false;
+ mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask);
if (mSwapMask != 0) {
@@ -1672,30 +1917,20 @@ void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
FetcherInfo &info = mFetcherInfos.editValueAt(i);
if (info.mToBeResumed) {
- const AString &uri = mFetcherInfos.keyAt(i);
- sp<AnotherPacketSource> sources[kMaxStreams];
- for (size_t j = 0; j < kMaxStreams; ++j) {
- if (uri == mStreams[j].mUri) {
- sources[j] = mPacketSources.valueFor(indexToType(j));
- }
- }
- if (sources[kAudioIndex] != NULL
- || sources[kVideoIndex] != NULL
- || sources[kSubtitleIndex] != NULL) {
- ALOGV("resuming fetcher %s", uri.c_str());
- info.mFetcher->startAsync(
- sources[kAudioIndex],
- sources[kVideoIndex],
- sources[kSubtitleIndex]);
- }
+ resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
info.mToBeResumed = false;
}
}
+ ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
+ mOrigBandwidthIndex, mCurBandwidthIndex);
+
mStreamMask = mNewStreamMask;
mSwitchInProgress = false;
+ mOrigBandwidthIndex = mCurBandwidthIndex;
- ALOGI("#### Finished Bandwidth Switch");
+
+ restartPollBuffering();
}
void LiveSession::schedulePollBuffering() {
@@ -1706,6 +1941,12 @@ void LiveSession::schedulePollBuffering() {
void LiveSession::cancelPollBuffering() {
++mPollBufferingGeneration;
+ mPrevBufferPercentage = -1;
+}
+
+void LiveSession::restartPollBuffering() {
+ cancelPollBuffering();
+ onPollBuffering();
}
void LiveSession::onPollBuffering() {
@@ -1714,70 +1955,90 @@ void LiveSession::onPollBuffering() {
mSwitchInProgress, mReconfigurationInProgress,
mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
- bool low, mid, high;
- if (checkBuffering(low, mid, high)) {
- if (mInPreparationPhase && mid) {
+ bool underflow, ready, down, up;
+ if (checkBuffering(underflow, ready, down, up)) {
+ if (mInPreparationPhase && ready) {
postPrepared(OK);
}
// don't switch before we report prepared
if (!mInPreparationPhase) {
- switchBandwidthIfNeeded(high, !mid);
- }
+ if (ready) {
+ stopBufferingIfNecessary();
+ } else if (underflow) {
+ startBufferingIfNecessary();
+ }
+ switchBandwidthIfNeeded(up, down);
+ }
+
}
schedulePollBuffering();
}
-void LiveSession::cancelBandwidthSwitch() {
- ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++", mSwitchGeneration);
-
- mSwitchGeneration++;
- mSwitchInProgress = false;
- mSwapMask = 0;
+void LiveSession::cancelBandwidthSwitch(bool resume) {
+ ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
+ mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
+ if (!mSwitchInProgress) {
+ return;
+ }
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
FetcherInfo& info = mFetcherInfos.editValueAt(i);
if (info.mToBeRemoved) {
info.mToBeRemoved = false;
+ if (resume) {
+ resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
+ }
}
}
for (size_t i = 0; i < kMaxStreams; ++i) {
- if (!mStreams[i].mNewUri.empty()) {
- ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
- if (j < 0) {
- mStreams[i].mNewUri.clear();
+ AString newUri = mStreams[i].mNewUri;
+ if (!newUri.empty()) {
+ // clear all mNewUri matching this newUri
+ for (size_t j = i; j < kMaxStreams; ++j) {
+ if (mStreams[j].mNewUri == newUri) {
+ mStreams[j].mNewUri.clear();
+ }
+ }
+ ALOGV("stopping newUri = %s", newUri.c_str());
+ ssize_t index = mFetcherInfos.indexOfKey(newUri);
+ if (index < 0) {
+ ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
continue;
}
-
- const FetcherInfo &info = mFetcherInfos.valueAt(j);
+ FetcherInfo &info = mFetcherInfos.editValueAt(index);
+ info.mToBeRemoved = true;
info.mFetcher->stopAsync();
- mFetcherInfos.removeItemsAt(j);
- mStreams[i].mNewUri.clear();
}
}
+
+ ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
+ mCurBandwidthIndex, mOrigBandwidthIndex);
+
+ mSwitchGeneration++;
+ mSwitchInProgress = false;
+ mCurBandwidthIndex = mOrigBandwidthIndex;
+ mSwapMask = 0;
}
-bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) {
- low = mid = high = false;
+bool LiveSession::checkBuffering(
+ bool &underflow, bool &ready, bool &down, bool &up) {
+ underflow = ready = down = up = false;
- if (mSwitchInProgress || mReconfigurationInProgress) {
+ if (mReconfigurationInProgress) {
ALOGV("Switch/Reconfig in progress, defer buffer polling");
return false;
}
- // TODO: Fine tune low/high mark.
- // We also need to pause playback if buffering is too low.
- // Currently during underflow, we depend on decoder to starve
- // to pause, but A/V could have different buffering left,
- // they're not paused together.
- // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE
-
- // Switch down if any of the fetchers are below low mark;
- // Switch up if all of the fetchers are over high mark.
- size_t activeCount, lowCount, midCount, highCount;
- activeCount = lowCount = midCount = highCount = 0;
+ size_t activeCount, underflowCount, readyCount, downCount, upCount;
+ activeCount = underflowCount = readyCount = downCount = upCount =0;
+ int32_t minBufferPercent = -1;
+ int64_t durationUs;
+ if (getDuration(&durationUs) != OK) {
+ durationUs = -1;
+ }
for (size_t i = 0; i < mPacketSources.size(); ++i) {
// we don't check subtitles for buffering level
if (!(mStreamMask & mPacketSources.keyAt(i)
@@ -1791,34 +2052,99 @@ bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) {
continue;
}
- ++activeCount;
int64_t bufferedDurationUs =
mPacketSources[i]->getEstimatedDurationUs();
ALOGV("source[%zu]: buffered %lld us", i, (long long)bufferedDurationUs);
- if (bufferedDurationUs < kLowWaterMark) {
- ++lowCount;
- break;
- } else if (bufferedDurationUs > kHighWaterMark) {
- ++midCount;
- ++highCount;
- } else if (bufferedDurationUs > kMidWaterMark) {
- ++midCount;
+ if (durationUs >= 0) {
+ int32_t percent;
+ if (mPacketSources[i]->isFinished(0 /* duration */)) {
+ percent = 100;
+ } else {
+ percent = (int32_t)(100.0 * (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
+ }
+ if (minBufferPercent < 0 || percent < minBufferPercent) {
+ minBufferPercent = percent;
+ }
}
+
+ ++activeCount;
+ int64_t readyMark = mInPreparationPhase ? kPrepareMark : kReadyMark;
+ if (bufferedDurationUs > readyMark
+ || mPacketSources[i]->isFinished(0)) {
+ ++readyCount;
+ }
+ if (!mPacketSources[i]->isFinished(0)) {
+ if (bufferedDurationUs < kUnderflowMark) {
+ ++underflowCount;
+ }
+ if (bufferedDurationUs > mUpSwitchMark) {
+ ++upCount;
+ } else if (bufferedDurationUs < mDownSwitchMark) {
+ ++downCount;
+ }
+ }
+ }
+
+ if (minBufferPercent >= 0) {
+ notifyBufferingUpdate(minBufferPercent);
}
if (activeCount > 0) {
- high = (highCount == activeCount);
- mid = (midCount == activeCount);
- low = (lowCount > 0);
+ up = (upCount == activeCount);
+ down = (downCount > 0);
+ ready = (readyCount == activeCount);
+ underflow = (underflowCount > 0);
return true;
}
return false;
}
+void LiveSession::startBufferingIfNecessary() {
+ ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
+ mInPreparationPhase, mBuffering);
+ if (!mBuffering) {
+ mBuffering = true;
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatBufferingStart);
+ notify->post();
+ }
+}
+
+void LiveSession::stopBufferingIfNecessary() {
+ ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
+ mInPreparationPhase, mBuffering);
+
+ if (mBuffering) {
+ mBuffering = false;
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatBufferingEnd);
+ notify->post();
+ }
+}
+
+void LiveSession::notifyBufferingUpdate(int32_t percentage) {
+ if (percentage < mPrevBufferPercentage) {
+ percentage = mPrevBufferPercentage;
+ } else if (percentage > 100) {
+ percentage = 100;
+ }
+
+ mPrevBufferPercentage = percentage;
+
+ ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatBufferingUpdate);
+ notify->setInt32("percentage", percentage);
+ notify->post();
+}
+
void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
// no need to check bandwidth if we only have 1 bandwidth settings
- if (mBandwidthItems.size() < 2) {
+ if (mSwitchInProgress || mBandwidthItems.size() < 2) {
return;
}
@@ -1850,6 +2176,22 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
}
}
+void LiveSession::postError(status_t err) {
+ // if we reached EOS, notify buffering of 100%
+ if (err == ERROR_END_OF_STREAM) {
+ notifyBufferingUpdate(100);
+ }
+ // we'll stop buffer polling now, before that notify
+ // stop buffering to stop the spinning icon
+ stopBufferingIfNecessary();
+ cancelPollBuffering();
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatError);
+ notify->setInt32("err", err);
+ notify->post();
+}
+
void LiveSession::postPrepared(status_t err) {
CHECK(mInPreparationPhase);
@@ -1857,6 +2199,8 @@ void LiveSession::postPrepared(status_t err) {
if (err == OK || err == ERROR_END_OF_STREAM) {
notify->setInt32("what", kWhatPrepared);
} else {
+ cancelPollBuffering();
+
notify->setInt32("what", kWhatPreparationFailed);
notify->setInt32("err", err);
}