summaryrefslogtreecommitdiffstats
path: root/media/libstagefright/httplive/LiveSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'media/libstagefright/httplive/LiveSession.cpp')
-rw-r--r--media/libstagefright/httplive/LiveSession.cpp343
1 files changed, 147 insertions, 196 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index ea4aab6..a8f60a8 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -49,8 +49,13 @@
namespace android {
+// static
// Number of recently-read bytes to use for bandwidth estimation
const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024;
+// High water mark to start up switch or report prepared)
+const int64_t LiveSession::kHighWaterMark = 8000000ll;
+const int64_t LiveSession::kMidWaterMark = 5000000ll;
+const int64_t LiveSession::kLowWaterMark = 3000000ll;
LiveSession::LiveSession(
const sp<AMessage> &notify, uint32_t flags,
@@ -75,14 +80,14 @@ LiveSession::LiveSession(
mSeekReplyID(0),
mFirstTimeUsValid(false),
mFirstTimeUs(0),
- mLastSeekTimeUs(0) {
+ mLastSeekTimeUs(0),
+ mPollBufferingGeneration(0) {
mStreams[kAudioIndex] = StreamItem("audio");
mStreams[kVideoIndex] = StreamItem("video");
mStreams[kSubtitleIndex] = StreamItem("subtitles");
for (size_t i = 0; i < kMaxStreams; ++i) {
- mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
mBuffering[i] = false;
@@ -97,6 +102,9 @@ LiveSession::LiveSession(
}
LiveSession::~LiveSession() {
+ if (mFetcherLooper != NULL) {
+ mFetcherLooper->stop();
+ }
}
sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
@@ -125,24 +133,7 @@ status_t LiveSession::dequeueAccessUnit(
return -EWOULDBLOCK;
}
- status_t finalResult;
- sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream);
- if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
- discontinuityQueue->dequeueAccessUnit(accessUnit);
- // seeking, track switching
- sp<AMessage> extra;
- int64_t timeUs;
- if ((*accessUnit)->meta()->findMessage("extra", &extra)
- && extra != NULL
- && extra->findInt64("timeUs", &timeUs)) {
- // seeking only
- mLastSeekTimeUs = timeUs;
- mDiscontinuityOffsetTimesUs.clear();
- mDiscontinuityAbsStartTimesUs.clear();
- }
- return INFO_DISCONTINUITY;
- }
-
+ status_t finalResult = OK;
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
ssize_t idx = typeToIndex(stream);
@@ -172,7 +163,7 @@ status_t LiveSession::dequeueAccessUnit(
if (mBuffering[idx]) {
if (mSwitchInProgress
|| packetSource->isFinished(0)
- || packetSource->getEstimatedDurationUs() > targetDurationUs) {
+ || packetSource->hasBufferAvailable(&finalResult)) {
mBuffering[idx] = false;
}
}
@@ -429,11 +420,16 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
if (what == PlaylistFetcher::kWhatStopped) {
AString uri;
CHECK(msg->findString("uri", &uri));
- if (mFetcherInfos.removeItem(uri) < 0) {
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0) {
// ignore duplicated kWhatStopped messages.
break;
}
+ mFetcherLooper->unregisterHandler(
+ mFetcherInfos[index].mFetcher->id());
+ mFetcherInfos.removeItemsAt(index);
+
if (mSwitchInProgress) {
tryToFinishBandwidthSwitch();
}
@@ -443,14 +439,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
CHECK_GT(mContinuationCounter, 0);
if (--mContinuationCounter == 0) {
mContinuation->post();
-
- if (mSeekReplyID != 0) {
- CHECK(mSeekReply != NULL);
- mSeekReply->setInt32("err", OK);
- mSeekReply->postReply(mSeekReplyID);
- mSeekReplyID = 0;
- mSeekReply.clear();
- }
}
}
break;
@@ -464,8 +452,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
int64_t durationUs;
CHECK(msg->findInt64("durationUs", &durationUs));
- FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
- info->mDurationUs = durationUs;
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index >= 0) {
+ FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
+ info->mDurationUs = durationUs;
+ }
break;
}
@@ -513,34 +504,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
- case PlaylistFetcher::kWhatTemporarilyDoneFetching:
- {
- AString uri;
- CHECK(msg->findString("uri", &uri));
-
- if (mFetcherInfos.indexOfKey(uri) < 0) {
- ALOGE("couldn't find uri");
- break;
- }
- FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
- info->mIsPrepared = true;
-
- if (mInPreparationPhase) {
- bool allFetchersPrepared = true;
- for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
- if (!mFetcherInfos.valueAt(i).mIsPrepared) {
- allFetchersPrepared = false;
- break;
- }
- }
-
- if (allFetchersPrepared) {
- postPrepared(OK);
- }
- }
- break;
- }
-
case PlaylistFetcher::kWhatStartedAt:
{
int32_t switchGeneration;
@@ -569,19 +532,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
- case kWhatCheckBandwidth:
- {
- int32_t generation;
- CHECK(msg->findInt32("generation", &generation));
-
- if (generation != mCheckBandwidthGeneration) {
- break;
- }
-
- onCheckBandwidth(msg);
- break;
- }
-
case kWhatChangeConfiguration:
{
onChangeConfiguration(msg);
@@ -612,15 +562,13 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
- case kWhatCheckSwitchDown:
+ case kWhatPollBuffering:
{
- onCheckSwitchDown();
- break;
- }
-
- case kWhatSwitchDown:
- {
- onSwitchDown();
+ int32_t generation;
+ CHECK(msg->findInt32("generation", &generation));
+ if (generation == mPollBufferingGeneration) {
+ onPollBuffering();
+ }
break;
}
@@ -691,6 +639,14 @@ void LiveSession::onConnect(const sp<AMessage> &msg) {
return;
}
+ // create looper for fetchers
+ if (mFetcherLooper == NULL) {
+ mFetcherLooper = new ALooper();
+
+ mFetcherLooper->setName("Fetcher");
+ mFetcherLooper->start(false, false);
+ }
+
// We trust the content provider to make a reasonable choice of preferred
// initial bandwidth by listing it first in the variant playlist.
// At startup we really don't have a good estimate on the available
@@ -739,19 +695,20 @@ void LiveSession::onConnect(const sp<AMessage> &msg) {
mPlaylist->pickRandomMediaItems();
changeConfiguration(
0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
+
+ schedulePollBuffering();
}
void LiveSession::finishDisconnect() {
// No reconfiguration is currently pending, make sure none will trigger
// during disconnection either.
- cancelCheckBandwidthEvent();
// Protect mPacketSources from a swapPacketSource race condition through disconnect.
// (finishDisconnect, onFinishDisconnect2)
cancelBandwidthSwitch();
- // cancel switch down monitor
- mSwitchDownMonitor.clear();
+ // cancel buffer polling
+ cancelPollBuffering();
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
@@ -799,7 +756,7 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
info.mDurationUs = -1ll;
info.mIsPrepared = false;
info.mToBeRemoved = false;
- looper()->registerHandler(info.mFetcher);
+ mFetcherLooper->registerHandler(info.mFetcher);
mFetcherInfos.add(uri, info);
@@ -1201,19 +1158,6 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
}
}
-bool LiveSession::canSwitchUp() {
- // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
- status_t err = OK;
- for (size_t i = 0; i < mPacketSources.size(); ++i) {
- sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
- int64_t dur = source->getBufferedDurationUs(&err);
- if (err == OK && dur > 10000000) {
- return true;
- }
- }
- return false;
-}
-
void LiveSession::changeConfiguration(
int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
// Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
@@ -1296,14 +1240,6 @@ void LiveSession::changeConfiguration(
if (mContinuationCounter == 0) {
msg->post();
-
- if (mSeekReplyID != 0) {
- CHECK(mSeekReply != NULL);
- mSeekReply->setInt32("err", OK);
- mSeekReply->postReply(mSeekReplyID);
- mSeekReplyID = 0;
- mSeekReply.clear();
- }
}
}
@@ -1323,6 +1259,30 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
// All fetchers are either suspended or have been removed now.
+ // If we're seeking, clear all packet sources before we report
+ // seek complete, to prevent decoder from pulling stale data.
+ int64_t timeUs;
+ CHECK(msg->findInt64("timeUs", &timeUs));
+
+ if (timeUs >= 0) {
+ mLastSeekTimeUs = timeUs;
+
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ mPacketSources.editValueAt(i)->clear();
+ }
+
+ mDiscontinuityOffsetTimesUs.clear();
+ mDiscontinuityAbsStartTimesUs.clear();
+
+ if (mSeekReplyID != 0) {
+ CHECK(mSeekReply != NULL);
+ mSeekReply->setInt32("err", OK);
+ mSeekReply->postReply(mSeekReplyID);
+ mSeekReplyID = 0;
+ mSeekReply.clear();
+ }
+ }
+
uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
@@ -1428,19 +1388,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
for (size_t j = 0; j < kMaxStreams; ++j) {
if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
sources[j] = mPacketSources.valueFor(indexToType(j));
-
- if (j != kSubtitleIndex) {
- ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
- sp<AnotherPacketSource> discontinuityQueue;
- discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
- discontinuityQueue->queueDiscontinuity(
- ATSParser::DISCONTINUITY_NONE,
- NULL,
- true);
- }
}
}
-
FetcherInfo &info = mFetcherInfos.editValueAt(i);
if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
|| sources[kSubtitleIndex] != NULL) {
@@ -1486,15 +1435,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
sources[j] = mPacketSources.valueFor(indexToType(j));
if (timeUs >= 0) {
- sources[j]->clear();
startTimeUs = timeUs;
-
- sp<AnotherPacketSource> discontinuityQueue;
- sp<AMessage> extra = new AMessage;
- extra->setInt64("timeUs", timeUs);
- discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
- discontinuityQueue->queueDiscontinuity(
- ATSParser::DISCONTINUITY_TIME, extra, true);
} else {
int32_t type;
sp<AMessage> meta;
@@ -1532,9 +1473,10 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
if (j == kSubtitleIndex) {
break;
}
- sp<AnotherPacketSource> discontinuityQueue;
- discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
- discontinuityQueue->queueDiscontinuity(
+
+ ALOGV("stream[%d]: queue format change", j);
+
+ sources[j]->queueDiscontinuity(
ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
} else {
// adapting, queue discontinuities after resume
@@ -1564,9 +1506,6 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
// All fetchers have now been started, the configuration change
// has completed.
- cancelCheckBandwidthEvent();
- scheduleCheckBandwidthEvent();
-
ALOGV("XXX configuration change completed.");
mReconfigurationInProgress = false;
if (switching) {
@@ -1623,47 +1562,35 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) {
tryToFinishBandwidthSwitch();
}
-void LiveSession::onCheckSwitchDown() {
- if (mSwitchDownMonitor == NULL) {
- return;
- }
-
- if (mSwitchInProgress || mReconfigurationInProgress) {
- ALOGV("Switch/Reconfig in progress, defer switch down");
- mSwitchDownMonitor->post(1000000ll);
- return;
- }
+void LiveSession::schedulePollBuffering() {
+ sp<AMessage> msg = new AMessage(kWhatPollBuffering, this);
+ msg->setInt32("generation", mPollBufferingGeneration);
+ msg->post(1000000ll);
+}
- for (size_t i = 0; i < kMaxStreams; ++i) {
- int32_t targetDuration;
- sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
- sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
+void LiveSession::cancelPollBuffering() {
+ ++mPollBufferingGeneration;
+}
- if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
- int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
- int64_t targetDurationUs = targetDuration * 1000000ll;
+void LiveSession::onPollBuffering() {
+ ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
+ "mInPreparationPhase %d, mStreamMask 0x%x",
+ mSwitchInProgress, mReconfigurationInProgress,
+ mInPreparationPhase, mStreamMask);
- if (bufferedDurationUs < targetDurationUs / 3) {
- (new AMessage(kWhatSwitchDown, this))->post();
- break;
- }
+ bool low, mid, high;
+ if (checkBuffering(low, mid, high)) {
+ if (mInPreparationPhase && mid) {
+ postPrepared(OK);
}
- }
-
- mSwitchDownMonitor->post(1000000ll);
-}
-
-void LiveSession::onSwitchDown() {
- if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
- return;
- }
- ssize_t bandwidthIndex = getBandwidthIndex();
- if (bandwidthIndex < mCurBandwidthIndex) {
- changeConfiguration(-1, bandwidthIndex, false);
- return;
+ // don't switch before we report prepared
+ if (!mInPreparationPhase && (low || high)) {
+ switchBandwidthIfNeeded(high);
+ }
}
+ schedulePollBuffering();
}
// Mark switch done when:
@@ -1688,16 +1615,6 @@ void LiveSession::tryToFinishBandwidthSwitch() {
}
}
-void LiveSession::scheduleCheckBandwidthEvent() {
- sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, this);
- msg->setInt32("generation", mCheckBandwidthGeneration);
- msg->post(10000000ll);
-}
-
-void LiveSession::cancelCheckBandwidthEvent() {
- ++mCheckBandwidthGeneration;
-}
-
void LiveSession::cancelBandwidthSwitch() {
Mutex::Autolock lock(mSwapMutex);
mSwitchGeneration++;
@@ -1727,33 +1644,69 @@ void LiveSession::cancelBandwidthSwitch() {
}
}
-bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
- if (mReconfigurationInProgress || mSwitchInProgress) {
+bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) {
+ low = mid = high = false;
+
+ if (mSwitchInProgress || mReconfigurationInProgress) {
+ ALOGV("Switch/Reconfig in progress, defer buffer polling");
return false;
}
- if (mCurBandwidthIndex < 0) {
- return true;
+ // TODO: Fine tune low/high mark.
+ // We also need to pause playback if buffering is too low.
+ // Currently during underflow, we depend on decoder to starve
+ // to pause, but A/V could have different buffering left,
+ // they're not paused together.
+ // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE
+
+ // Switch down if any of the fetchers are below low mark;
+ // Switch up if all of the fetchers are over high mark.
+ size_t activeCount, lowCount, midCount, highCount;
+ activeCount = lowCount = midCount = highCount = 0;
+ for (size_t i = 0; i < mPacketSources.size(); ++i) {
+ // we don't check subtitles for buffering level
+ if (!(mStreamMask & mPacketSources.keyAt(i)
+ & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) {
+ continue;
+ }
+ // ignore streams that never had any packet queued.
+ // (it's possible that the variant only has audio or video)
+ sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
+ if (meta == NULL) {
+ continue;
+ }
+
+ ++activeCount;
+ int64_t bufferedDurationUs =
+ mPacketSources[i]->getEstimatedDurationUs();
+ ALOGV("source[%d]: buffered %lld us", i, bufferedDurationUs);
+ if (bufferedDurationUs < kLowWaterMark) {
+ ++lowCount;
+ break;
+ } else if (bufferedDurationUs > kHighWaterMark) {
+ ++midCount;
+ ++highCount;
+ } else if (bufferedDurationUs > kMidWaterMark) {
+ ++midCount;
+ }
}
- if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
- return false;
- } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
- return canSwitchUp();
- } else {
+ if (activeCount > 0) {
+ high = (highCount == activeCount);
+ mid = (midCount == activeCount);
+ low = (lowCount > 0);
return true;
}
+
+ return false;
}
-void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
- size_t bandwidthIndex = getBandwidthIndex();
- if (canSwitchBandwidthTo(bandwidthIndex)) {
- changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
- } else {
- // Come back and check again 10 seconds later in case there is nothing to do now.
- // If we DO change configuration, once that completes it'll schedule a new
- // check bandwidth event with an incremented mCheckBandwidthGeneration.
- msg->post(10000000ll);
+void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) {
+ ssize_t bandwidthIndex = getBandwidthIndex();
+
+ if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
+ || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) {
+ changeConfiguration(-1, bandwidthIndex, false);
}
}
@@ -1771,10 +1724,8 @@ void LiveSession::postPrepared(status_t err) {
notify->post();
mInPreparationPhase = false;
-
- mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, this);
- mSwitchDownMonitor->post();
}
+
} // namespace android