summaryrefslogtreecommitdiffstats
path: root/media
diff options
context:
space:
mode:
authorChong Zhang <chz@google.com>2015-03-27 22:37:47 +0000
committerAndroid (Google) Code Review <android-gerrit@google.com>2015-03-27 22:37:48 +0000
commit9bb23ff63e17b91a8480b142975067e8b6ab69ec (patch)
tree9724db4b1a9a4b2f46342493a7fec445c9b73d98 /media
parent27ce49ff57e36f0472f1675d0ff2b3a5f6aebbcc (diff)
parent7c8708046117e03c0d38006bdd9685139df3ac6b (diff)
downloadframeworks_av-9bb23ff63e17b91a8480b142975067e8b6ab69ec.zip
frameworks_av-9bb23ff63e17b91a8480b142975067e8b6ab69ec.tar.gz
frameworks_av-9bb23ff63e17b91a8480b142975067e8b6ab69ec.tar.bz2
Merge "HLS: faster switching and pause/resume on low buffer"
Diffstat (limited to 'media')
-rw-r--r--media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp28
-rw-r--r--media/libstagefright/httplive/LiveSession.cpp666
-rw-r--r--media/libstagefright/httplive/LiveSession.h50
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.cpp340
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.h11
-rw-r--r--media/libstagefright/mpeg2ts/AnotherPacketSource.cpp164
-rw-r--r--media/libstagefright/mpeg2ts/AnotherPacketSource.h7
7 files changed, 945 insertions, 321 deletions
diff --git a/media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp b/media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp
index d01e83a..0476c9b 100644
--- a/media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp
+++ b/media/libmediaplayerservice/nuplayer/HTTPLiveSource.cpp
@@ -281,6 +281,34 @@ void NuPlayer::HTTPLiveSource::onSessionNotify(const sp<AMessage> &msg) {
break;
}
+ case LiveSession::kWhatBufferingStart:
+ {
+ sp<AMessage> notify = dupNotify();
+ notify->setInt32("what", kWhatPauseOnBufferingStart);
+ notify->post();
+ break;
+ }
+
+ case LiveSession::kWhatBufferingEnd:
+ {
+ sp<AMessage> notify = dupNotify();
+ notify->setInt32("what", kWhatResumeOnBufferingEnd);
+ notify->post();
+ break;
+ }
+
+
+ case LiveSession::kWhatBufferingUpdate:
+ {
+ sp<AMessage> notify = dupNotify();
+ int32_t percentage;
+ CHECK(msg->findInt32("percentage", &percentage));
+ notify->setInt32("what", kWhatBufferingUpdate);
+ notify->setInt32("percentage", percentage);
+ notify->post();
+ break;
+ }
+
case LiveSession::kWhatError:
{
break;
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index 738f8b6..4ac2fb2 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -33,6 +33,7 @@
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/foundation/AUtils.h>
#include <media/stagefright/DataSource.h>
#include <media/stagefright/FileSource.h>
#include <media/stagefright/MediaErrors.h>
@@ -49,12 +50,6 @@
namespace android {
-// static
-// High water mark to start up switch or report prepared)
-const int64_t LiveSession::kHighWaterMark = 8000000ll;
-const int64_t LiveSession::kMidWaterMark = 5000000ll;
-const int64_t LiveSession::kLowWaterMark = 3000000ll;
-
struct LiveSession::BandwidthEstimator : public RefBase {
BandwidthEstimator();
@@ -119,15 +114,34 @@ bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) {
return true;
}
+//static
+const char *LiveSession::getKeyForStream(StreamType type) {
+ switch (type) {
+ case STREAMTYPE_VIDEO:
+ return "timeUsVideo";
+ case STREAMTYPE_AUDIO:
+ return "timeUsAudio";
+ case STREAMTYPE_SUBTITLES:
+ return "timeUsSubtitle";
+ default:
+ TRESPASS();
+ }
+ return NULL;
+}
+
LiveSession::LiveSession(
const sp<AMessage> &notify, uint32_t flags,
const sp<IMediaHTTPService> &httpService)
: mNotify(notify),
mFlags(flags),
mHTTPService(httpService),
+ mBuffering(false),
mInPreparationPhase(true),
+ mPollBufferingGeneration(0),
+ mPrevBufferPercentage(-1),
mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
mCurBandwidthIndex(-1),
+ mOrigBandwidthIndex(-1),
mLastBandwidthBps(-1ll),
mBandwidthEstimator(new BandwidthEstimator()),
mStreamMask(0),
@@ -139,11 +153,12 @@ LiveSession::LiveSession(
mRealTimeBaseUs(0ll),
mReconfigurationInProgress(false),
mSwitchInProgress(false),
+ mUpSwitchMark(kUpSwitchMark),
+ mDownSwitchMark(kDownSwitchMark),
+ mUpSwitchMargin(kUpSwitchMargin),
mFirstTimeUsValid(false),
mFirstTimeUs(0),
- mLastSeekTimeUs(0),
- mPollBufferingGeneration(0) {
-
+ mLastSeekTimeUs(0) {
mStreams[kAudioIndex] = StreamItem("audio");
mStreams[kVideoIndex] = StreamItem("video");
mStreams[kSubtitleIndex] = StreamItem("subtitles");
@@ -162,12 +177,6 @@ LiveSession::~LiveSession() {
status_t LiveSession::dequeueAccessUnit(
StreamType stream, sp<ABuffer> *accessUnit) {
- if (!(mStreamMask & stream)) {
- // return -EWOULDBLOCK to avoid halting the decoder
- // when switching between audio/video and audio only.
- return -EWOULDBLOCK;
- }
-
status_t finalResult = OK;
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
@@ -225,26 +234,6 @@ status_t LiveSession::dequeueAccessUnit(
streamStr,
type,
extra == NULL ? "NULL" : extra->debugString().c_str());
-
- size_t seq = strm.mCurDiscontinuitySeq;
- int64_t offsetTimeUs;
- if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
- offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
- } else {
- offsetTimeUs = 0;
- }
-
- seq += 1;
- if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
- int64_t firstTimeUs;
- firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
- offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
- offsetTimeUs += strm.mLastSampleDurationUs;
- } else {
- offsetTimeUs += strm.mLastSampleDurationUs;
- }
-
- mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
} else if (err == OK) {
if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
@@ -252,7 +241,26 @@ status_t LiveSession::dequeueAccessUnit(
int32_t discontinuitySeq = 0;
CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
(*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
- strm.mCurDiscontinuitySeq = discontinuitySeq;
+ if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) {
+ int64_t offsetTimeUs;
+ if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
+ offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq);
+ } else {
+ offsetTimeUs = 0;
+ }
+
+ if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
+ int64_t firstTimeUs;
+ firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
+ offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
+ offsetTimeUs += strm.mLastSampleDurationUs;
+ } else {
+ offsetTimeUs += strm.mLastSampleDurationUs;
+ }
+
+ mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs);
+ strm.mCurDiscontinuitySeq = discontinuitySeq;
+ }
int32_t discard = 0;
int64_t firstTimeUs;
@@ -317,6 +325,11 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
return -EAGAIN;
}
+ if (stream == STREAMTYPE_AUDIO) {
+ // set AAC input buffer size to 32K bytes (256kbps x 1sec)
+ meta->setInt32(kKeyMaxInputSize, 32 * 1024);
+ }
+
return convertMetaDataToMessage(meta, format);
}
@@ -357,6 +370,102 @@ status_t LiveSession::seekTo(int64_t timeUs) {
return err;
}
+bool LiveSession::checkSwitchProgress(
+ sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) {
+ AString newUri;
+ CHECK(stopParams->findString("uri", &newUri));
+
+ *needResumeUntil = false;
+ sp<AMessage> firstNewMeta[kMaxStreams];
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ StreamType stream = indexToType(i);
+ if (!(mSwapMask & mNewStreamMask & stream)
+ || (mStreams[i].mNewUri != newUri)) {
+ continue;
+ }
+ if (stream == STREAMTYPE_SUBTITLES) {
+ continue;
+ }
+ sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i);
+
+ // First, get latest dequeued meta, which is where the decoder is at.
+ // (when upswitching, we take the meta after a certain delay, so that
+ // the decoder is left with some cushion)
+ sp<AMessage> lastDequeueMeta, lastEnqueueMeta;
+ if (delayUs > 0) {
+ lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs);
+ } else {
+ lastDequeueMeta = source->getLatestDequeuedMeta();
+ }
+ // Then, trim off packets at beginning of mPacketSources2 that's before
+ // the latest dequeued time. These samples are definitely too late.
+ int64_t lastTimeUs, startTimeUs;
+ int32_t lastSeq, startSeq;
+ if (lastDequeueMeta != NULL) {
+ CHECK(lastDequeueMeta->findInt64("timeUs", &lastTimeUs));
+ CHECK(lastDequeueMeta->findInt32("discontinuitySeq", &lastSeq));
+ firstNewMeta[i] = mPacketSources2.editValueAt(i)
+ ->trimBuffersBeforeTimeUs(lastSeq, lastTimeUs);
+ }
+ // Now firstNewMeta[i] is the first sample after the trim.
+ // If it's NULL, we failed because dequeue already past all samples
+ // in mPacketSource2, we have to try again.
+ if (firstNewMeta[i] == NULL) {
+ ALOGV("[%s] dequeue time (%d, %lld) past start time",
+ stream == STREAMTYPE_AUDIO ? "audio" : "video",
+ lastSeq, (long long) lastTimeUs);
+ return false;
+ }
+
+ // Otherwise, we check if mPacketSources2 overlaps with what old fetcher
+ // already fetched, and see if we need to resumeUntil
+ lastEnqueueMeta = source->getLatestEnqueuedMeta();
+ // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity
+ // boundary, no need to resume as the content will look different anyways
+ if (lastEnqueueMeta != NULL) {
+ CHECK(lastEnqueueMeta->findInt64("timeUs", &lastTimeUs));
+ CHECK(lastEnqueueMeta->findInt32("discontinuitySeq", &lastSeq));
+ CHECK(firstNewMeta[i]->findInt64("timeUs", &startTimeUs));
+ CHECK(firstNewMeta[i]->findInt32("discontinuitySeq", &startSeq));
+
+ // no need to resume old fetcher if new fetcher started in different
+ // discontinuity sequence, as the content will look different.
+ *needResumeUntil |=
+ (startSeq == lastSeq
+ && startTimeUs - lastTimeUs > 100000ll);
+
+ // update the stopTime for resumeUntil, as we might have removed some
+ // packets from the head in mPacketSource2
+ stopParams->setInt64(getKeyForStream(stream), startTimeUs);
+ }
+ }
+
+ // if we're here, it means dequeue progress hasn't passed some samples in
+ // mPacketSource2, we can trim off the excess in mPacketSource.
+ // (old fetcher might still need to resumeUntil the start time of new fetcher)
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ StreamType stream = indexToType(i);
+ if (!(mSwapMask & mNewStreamMask & stream)
+ || (newUri != mStreams[i].mNewUri)) {
+ continue;
+ }
+ if (stream == STREAMTYPE_SUBTITLES) {
+ continue;
+ }
+ int64_t startTimeUs;
+ int32_t startSeq;
+ CHECK(firstNewMeta[i] != NULL);
+ CHECK(firstNewMeta[i]->findInt64("timeUs", &startTimeUs));
+ CHECK(firstNewMeta[i]->findInt32("discontinuitySeq", &startSeq));
+ mPacketSources.valueFor(stream)->trimBuffersAfterTimeUs(startSeq, startTimeUs);
+ }
+
+ // no resumeUntil if already underflow
+ *needResumeUntil &= !mBuffering;
+
+ return true;
+}
+
void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatConnect:
@@ -412,8 +521,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
}
if (what == PlaylistFetcher::kWhatStopped) {
- tryToFinishBandwidthSwitch(uri);
-
mFetcherLooper->unregisterHandler(
mFetcherInfos[index].mFetcher->id());
mFetcherInfos.removeItemsAt(index);
@@ -452,6 +559,16 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ case PlaylistFetcher::kWhatTargetDurationUpdate:
+ {
+ int64_t targetDurationUs;
+ CHECK(msg->findInt64("targetDurationUs", &targetDurationUs));
+ mUpSwitchMark = min(kUpSwitchMark, targetDurationUs * 3);
+ mDownSwitchMark = min(kDownSwitchMark, targetDurationUs * 9 / 4);
+ mUpSwitchMargin = min(kUpSwitchMargin, targetDurationUs);
+ break;
+ }
+
case PlaylistFetcher::kWhatError:
{
status_t err;
@@ -489,10 +606,23 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
mPacketSources.valueFor(
STREAMTYPE_SUBTITLES)->signalEOS(err);
- sp<AMessage> notify = mNotify->dup();
- notify->setInt32("what", kWhatError);
- notify->setInt32("err", err);
- notify->post();
+ postError(err);
+ break;
+ }
+
+ case PlaylistFetcher::kWhatStopReached:
+ {
+ ALOGV("kWhatStopReached");
+
+ AString uri;
+ CHECK(msg->findString("uri", &uri));
+
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0) {
+ break;
+ }
+
+ tryToFinishBandwidthSwitch(uri);
break;
}
@@ -507,20 +637,67 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
AString uri;
CHECK(msg->findString("uri", &uri));
+
+ // mark new fetcher mToBeResumed
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index >= 0) {
mFetcherInfos.editValueAt(index).mToBeResumed = true;
}
- // Resume fetcher for the original variant; the resumed fetcher should
- // continue until the timestamps found in msg, which is stored by the
- // new fetcher to indicate where the new variant has started buffering.
- for (size_t i = 0; i < mFetcherInfos.size(); i++) {
- const FetcherInfo info = mFetcherInfos.valueAt(i);
- if (info.mToBeRemoved) {
- info.mFetcher->resumeUntilAsync(msg);
+ // temporarily disable packet sources to be swapped to prevent
+ // NuPlayerDecoder from dequeuing while we check progress
+ for (size_t i = 0; i < mPacketSources.size(); ++i) {
+ if ((mSwapMask & mPacketSources.keyAt(i))
+ && uri == mStreams[i].mNewUri) {
+ mPacketSources.editValueAt(i)->enable(false);
}
}
+ bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex);
+ // If switching up, require a cushion bigger than kUnderflowMark
+ // to avoid buffering immediately after the switch.
+ // (If we don't have that cushion we'd rather cancel and try again.)
+ int64_t delayUs = switchUp ? (kUnderflowMark + 1000000ll) : 0;
+ bool needResumeUntil = false;
+ sp<AMessage> stopParams = msg;
+ if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) {
+ // playback time hasn't passed startAt time
+ if (!needResumeUntil) {
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if ((mSwapMask & indexToType(i))
+ && uri == mStreams[i].mNewUri) {
+ // have to make a copy of mStreams[i].mUri because
+ // tryToFinishBandwidthSwitch is modifying mStreams[]
+ AString oldURI = mStreams[i].mUri;
+ tryToFinishBandwidthSwitch(oldURI);
+ break;
+ }
+ }
+ } else {
+ // startAt time is after last enqueue time
+ // Resume fetcher for the original variant; the resumed fetcher should
+ // continue until the timestamps found in msg, which is stored by the
+ // new fetcher to indicate where the new variant has started buffering.
+ for (size_t i = 0; i < mFetcherInfos.size(); i++) {
+ const FetcherInfo &info = mFetcherInfos.valueAt(i);
+ if (info.mToBeRemoved) {
+ info.mFetcher->resumeUntilAsync(stopParams);
+ }
+ }
+ }
+ } else {
+ // playback time passed startAt time
+ if (switchUp) {
+ // if switching up, cancel and retry if condition satisfies again
+ cancelBandwidthSwitch(true /* resume */);
+ } else {
+ resumeFetcher(uri, mSwapMask, -1, true /* newUri */);
+ }
+ }
+ // re-enable all packet sources
+ for (size_t i = 0; i < mPacketSources.size(); ++i) {
+ mPacketSources.editValueAt(i)->enable(true);
+ }
+
break;
}
@@ -688,8 +865,6 @@ void LiveSession::onConnect(const sp<AMessage> &msg) {
mPlaylist->pickRandomMediaItems();
changeConfiguration(
0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
-
- schedulePollBuffering();
}
void LiveSession::finishDisconnect() {
@@ -950,6 +1125,43 @@ static double uniformRand() {
}
#endif
+bool LiveSession::resumeFetcher(
+ const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) {
+ ssize_t index = mFetcherInfos.indexOfKey(uri);
+ if (index < 0) {
+ ALOGE("did not find fetcher for uri: %s", uri.c_str());
+ return false;
+ }
+
+ bool resume = false;
+ sp<AnotherPacketSource> sources[kMaxStreams];
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if ((streamMask & indexToType(i))
+ && ((!newUri && uri == mStreams[i].mUri)
+ || (newUri && uri == mStreams[i].mNewUri))) {
+ resume = true;
+ if (newUri) {
+ sources[i] = mPacketSources2.valueFor(indexToType(i));
+ sources[i]->clear();
+ } else {
+ sources[i] = mPacketSources.valueFor(indexToType(i));
+ }
+ }
+ }
+
+ if (resume) {
+ ALOGV("resuming fetcher %s, timeUs %lld", uri.c_str(), (long long)timeUs);
+ SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition;
+ mFetcherInfos.editValueAt(index).mFetcher->startAsync(
+ sources[kAudioIndex],
+ sources[kVideoIndex],
+ sources[kSubtitleIndex],
+ timeUs, -1, -1, seekMode);
+ }
+
+ return resume;
+}
+
float LiveSession::getAbortThreshold(
ssize_t currentBWIndex, ssize_t targetBWIndex) const {
float abortThreshold = -1.0f;
@@ -983,12 +1195,17 @@ float LiveSession::getAbortThreshold(
X/T < bw1 / (bw1 + bw0 - bw)
*/
+ // Taking the measured current bandwidth at 50% face value only,
+ // as our bandwidth estimation is a lagging indicator. Being
+ // conservative on this, we prefer switching to lower bandwidth
+ // unless we're really confident finishing up the last segment
+ // of higher bandwidth will be fast.
CHECK(mLastBandwidthBps >= 0);
abortThreshold =
(float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
/ ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
+ (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
- - (float)mLastBandwidthBps * 0.7f);
+ - (float)mLastBandwidthBps * 0.5f);
if (abortThreshold < 0.0f) {
abortThreshold = -1.0f; // do not abort
}
@@ -1128,7 +1345,7 @@ status_t LiveSession::onSeek(const sp<AMessage> &msg) {
CHECK(msg->findInt64("timeUs", &timeUs));
if (!mReconfigurationInProgress) {
- changeConfiguration(timeUs, mCurBandwidthIndex);
+ changeConfiguration(timeUs);
return OK;
} else {
return -EWOULDBLOCK;
@@ -1184,7 +1401,6 @@ status_t LiveSession::selectTrack(size_t index, bool select) {
status_t err = mPlaylist->selectTrack(index, select);
if (err == OK) {
sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this);
- msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
msg->setInt32("pickTrack", select);
msg->post();
}
@@ -1200,19 +1416,17 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
}
void LiveSession::changeConfiguration(
- int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
- // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
- // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
+ int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) {
cancelBandwidthSwitch();
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
-
- ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
- timeUs, bandwidthIndex, pickTrack);
-
- CHECK_LT(bandwidthIndex, mBandwidthItems.size());
- const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
+ if (bandwidthIndex >= 0) {
+ mOrigBandwidthIndex = mCurBandwidthIndex;
+ mCurBandwidthIndex = bandwidthIndex;
+ }
+ CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size());
+ const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex);
uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
@@ -1227,6 +1441,12 @@ void LiveSession::changeConfiguration(
// Step 1, stop and discard fetchers that are no longer needed.
// Pause those that we'll reuse.
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
+ // skip fetchers that are marked mToBeRemoved,
+ // these are done and can't be reused
+ if (mFetcherInfos[i].mToBeRemoved) {
+ continue;
+ }
+
const AString &uri = mFetcherInfos.keyAt(i);
bool discardFetcher = true;
@@ -1255,7 +1475,7 @@ void LiveSession::changeConfiguration(
} else if (!pickTrack) {
// adapting, abort if remaining of current segment is over threshold
threshold = getAbortThreshold(
- mCurBandwidthIndex, bandwidthIndex);
+ mOrigBandwidthIndex, mCurBandwidthIndex);
}
ALOGV("Pausing with threshold %.3f", threshold);
@@ -1264,8 +1484,6 @@ void LiveSession::changeConfiguration(
}
}
- mCurBandwidthIndex = bandwidthIndex;
-
sp<AMessage> msg;
if (timeUs < 0ll) {
// skip onChangeConfiguration2 (decoder destruction) if not seeking.
@@ -1297,10 +1515,9 @@ void LiveSession::changeConfiguration(
void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
if (!mReconfigurationInProgress) {
- int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
+ int32_t pickTrack = 0;
msg->findInt32("pickTrack", &pickTrack);
- msg->findInt32("bandwidthIndex", &bandwidthIndex);
- changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
+ changeConfiguration(-1ll /* timeUs */, -1, pickTrack);
} else {
msg->post(1000000ll); // retry in 1 sec
}
@@ -1323,6 +1540,10 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
mPacketSources.editValueAt(i)->clear();
}
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ mStreams[i].mCurDiscontinuitySeq = 0;
+ }
+
mDiscontinuityOffsetTimesUs.clear();
mDiscontinuityAbsStartTimesUs.clear();
@@ -1333,6 +1554,10 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
mSeekReplyID.clear();
mSeekReply.clear();
}
+
+ // restart buffer polling after seek becauese previous
+ // buffering position is no longer valid.
+ restartPollBuffering();
}
uint32_t streamMask, resumeMask;
@@ -1407,12 +1632,14 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
int64_t timeUs;
int32_t pickTrack;
bool switching = false;
+ bool finishSwitching = false;
CHECK(msg->findInt64("timeUs", &timeUs));
CHECK(msg->findInt32("pickTrack", &pickTrack));
if (timeUs < 0ll) {
if (!pickTrack) {
switching = true;
+ finishSwitching = (streamMask == 0);
}
mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
} else {
@@ -1440,20 +1667,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
const AString &uri = mFetcherInfos.keyAt(i);
-
- sp<AnotherPacketSource> sources[kMaxStreams];
- for (size_t j = 0; j < kMaxStreams; ++j) {
- if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
- sources[j] = mPacketSources.valueFor(indexToType(j));
- }
- }
- FetcherInfo &info = mFetcherInfos.editValueAt(i);
- if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
- || sources[kSubtitleIndex] != NULL) {
- info.mFetcher->startAsync(
- sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs);
- } else {
- info.mToBeRemoved = true;
+ if (!resumeFetcher(uri, resumeMask, timeUs)) {
+ mFetcherInfos.editValueAt(i).mToBeRemoved = true;
}
}
@@ -1512,24 +1727,42 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
} else {
// adapting
meta = sources[j]->getLatestEnqueuedMeta();
+ if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) {
+ // switching up
+ meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin);
+ }
}
- if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
+ if (j != kSubtitleIndex
+ && meta != NULL
+ && !meta->findInt32("discontinuity", &type)) {
int64_t tmpUs;
int64_t tmpSegmentUs;
+ int32_t seq;
CHECK(meta->findInt64("timeUs", &tmpUs));
CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs));
- if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) {
+ CHECK(meta->findInt32("discontinuitySeq", &seq));
+ // If we're switching and looking for next sample or segment, set the target
+ // segment start time to tmpSegmentUs + tmpDurationUs / 2, which is
+ // the middle point of the segment where the last sample was.
+ // This is needed if segments of the two variants are not perfectly
+ // aligned. (If the corresponding segment in new variant starts slightly
+ // later than that in the old variant, we still want the switching to
+ // start in the next one, not the current one)
+ if (mStreams[j].mSeekMode == kSeekModeNextSample
+ || mStreams[j].mSeekMode == kSeekModeNextSegment) {
+ int64_t tmpDurationUs;
+ CHECK(meta->findInt64("segmentDurationUs", &tmpDurationUs));
+ tmpSegmentUs += tmpDurationUs / 2;
+ }
+ if (startTimeUs < 0 || seq > discontinuitySeq
+ || (seq == discontinuitySeq
+ && (tmpSegmentUs > segmentStartTimeUs
+ || (tmpSegmentUs == segmentStartTimeUs
+ && tmpUs > startTimeUs)))) {
startTimeUs = tmpUs;
segmentStartTimeUs = tmpSegmentUs;
- } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) {
- startTimeUs = tmpUs;
- }
-
- int32_t seq;
- CHECK(meta->findInt32("discontinuitySeq", &seq));
- if (discontinuitySeq < 0 || seq < discontinuitySeq) {
discontinuitySeq = seq;
}
}
@@ -1585,8 +1818,21 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
mReconfigurationInProgress = false;
if (switching) {
mSwitchInProgress = true;
+
+ if (finishSwitching) {
+ // Switch is finished now, no new fetchers are created.
+ // This path is hit when old variant had video and audio from
+ // two separate fetchers, while new variant has audio only,
+ // which reuses the previous audio fetcher.
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if (mSwapMask & indexToType(i)) {
+ tryToFinishBandwidthSwitch(mStreams[i].mUri);
+ }
+ }
+ }
} else {
mStreamMask = mNewStreamMask;
+ mOrigBandwidthIndex = mCurBandwidthIndex;
}
if (mDisconnectReplyID != NULL) {
@@ -1614,21 +1860,20 @@ void LiveSession::swapPacketSource(StreamType stream) {
aps2->clear();
}
-void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
+void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) {
if (!mSwitchInProgress) {
return;
}
- ssize_t index = mFetcherInfos.indexOfKey(uri);
+ ssize_t index = mFetcherInfos.indexOfKey(oldUri);
if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
return;
}
// Swap packet source of streams provided by old variant
for (size_t idx = 0; idx < kMaxStreams; idx++) {
- if (uri == mStreams[idx].mUri) {
- StreamType stream = indexToType(idx);
-
+ StreamType stream = indexToType(idx);
+ if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) {
swapPacketSource(stream);
if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
@@ -1642,7 +1887,7 @@ void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
}
}
- mFetcherInfos.editValueAt(index).mToBeRemoved = false;
+ mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */);
ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask);
if (mSwapMask != 0) {
@@ -1672,30 +1917,20 @@ void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
FetcherInfo &info = mFetcherInfos.editValueAt(i);
if (info.mToBeResumed) {
- const AString &uri = mFetcherInfos.keyAt(i);
- sp<AnotherPacketSource> sources[kMaxStreams];
- for (size_t j = 0; j < kMaxStreams; ++j) {
- if (uri == mStreams[j].mUri) {
- sources[j] = mPacketSources.valueFor(indexToType(j));
- }
- }
- if (sources[kAudioIndex] != NULL
- || sources[kVideoIndex] != NULL
- || sources[kSubtitleIndex] != NULL) {
- ALOGV("resuming fetcher %s", uri.c_str());
- info.mFetcher->startAsync(
- sources[kAudioIndex],
- sources[kVideoIndex],
- sources[kSubtitleIndex]);
- }
+ resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask);
info.mToBeResumed = false;
}
}
+ ALOGI("#### Finished Bandwidth Switch: %zd => %zd",
+ mOrigBandwidthIndex, mCurBandwidthIndex);
+
mStreamMask = mNewStreamMask;
mSwitchInProgress = false;
+ mOrigBandwidthIndex = mCurBandwidthIndex;
- ALOGI("#### Finished Bandwidth Switch");
+
+ restartPollBuffering();
}
void LiveSession::schedulePollBuffering() {
@@ -1706,6 +1941,12 @@ void LiveSession::schedulePollBuffering() {
void LiveSession::cancelPollBuffering() {
++mPollBufferingGeneration;
+ mPrevBufferPercentage = -1;
+}
+
+void LiveSession::restartPollBuffering() {
+ cancelPollBuffering();
+ onPollBuffering();
}
void LiveSession::onPollBuffering() {
@@ -1714,70 +1955,90 @@ void LiveSession::onPollBuffering() {
mSwitchInProgress, mReconfigurationInProgress,
mInPreparationPhase, mCurBandwidthIndex, mStreamMask);
- bool low, mid, high;
- if (checkBuffering(low, mid, high)) {
- if (mInPreparationPhase && mid) {
+ bool underflow, ready, down, up;
+ if (checkBuffering(underflow, ready, down, up)) {
+ if (mInPreparationPhase && ready) {
postPrepared(OK);
}
// don't switch before we report prepared
if (!mInPreparationPhase) {
- switchBandwidthIfNeeded(high, !mid);
- }
+ if (ready) {
+ stopBufferingIfNecessary();
+ } else if (underflow) {
+ startBufferingIfNecessary();
+ }
+ switchBandwidthIfNeeded(up, down);
+ }
+
}
schedulePollBuffering();
}
-void LiveSession::cancelBandwidthSwitch() {
- ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++", mSwitchGeneration);
-
- mSwitchGeneration++;
- mSwitchInProgress = false;
- mSwapMask = 0;
+void LiveSession::cancelBandwidthSwitch(bool resume) {
+ ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd",
+ mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex);
+ if (!mSwitchInProgress) {
+ return;
+ }
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
FetcherInfo& info = mFetcherInfos.editValueAt(i);
if (info.mToBeRemoved) {
info.mToBeRemoved = false;
+ if (resume) {
+ resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask);
+ }
}
}
for (size_t i = 0; i < kMaxStreams; ++i) {
- if (!mStreams[i].mNewUri.empty()) {
- ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
- if (j < 0) {
- mStreams[i].mNewUri.clear();
+ AString newUri = mStreams[i].mNewUri;
+ if (!newUri.empty()) {
+ // clear all mNewUri matching this newUri
+ for (size_t j = i; j < kMaxStreams; ++j) {
+ if (mStreams[j].mNewUri == newUri) {
+ mStreams[j].mNewUri.clear();
+ }
+ }
+ ALOGV("stopping newUri = %s", newUri.c_str());
+ ssize_t index = mFetcherInfos.indexOfKey(newUri);
+ if (index < 0) {
+ ALOGE("did not find fetcher for newUri: %s", newUri.c_str());
continue;
}
-
- const FetcherInfo &info = mFetcherInfos.valueAt(j);
+ FetcherInfo &info = mFetcherInfos.editValueAt(index);
+ info.mToBeRemoved = true;
info.mFetcher->stopAsync();
- mFetcherInfos.removeItemsAt(j);
- mStreams[i].mNewUri.clear();
}
}
+
+ ALOGI("#### Canceled Bandwidth Switch: %zd => %zd",
+ mCurBandwidthIndex, mOrigBandwidthIndex);
+
+ mSwitchGeneration++;
+ mSwitchInProgress = false;
+ mCurBandwidthIndex = mOrigBandwidthIndex;
+ mSwapMask = 0;
}
-bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) {
- low = mid = high = false;
+bool LiveSession::checkBuffering(
+ bool &underflow, bool &ready, bool &down, bool &up) {
+ underflow = ready = down = up = false;
- if (mSwitchInProgress || mReconfigurationInProgress) {
+ if (mReconfigurationInProgress) {
ALOGV("Switch/Reconfig in progress, defer buffer polling");
return false;
}
- // TODO: Fine tune low/high mark.
- // We also need to pause playback if buffering is too low.
- // Currently during underflow, we depend on decoder to starve
- // to pause, but A/V could have different buffering left,
- // they're not paused together.
- // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE
-
- // Switch down if any of the fetchers are below low mark;
- // Switch up if all of the fetchers are over high mark.
- size_t activeCount, lowCount, midCount, highCount;
- activeCount = lowCount = midCount = highCount = 0;
+ size_t activeCount, underflowCount, readyCount, downCount, upCount;
+ activeCount = underflowCount = readyCount = downCount = upCount =0;
+ int32_t minBufferPercent = -1;
+ int64_t durationUs;
+ if (getDuration(&durationUs) != OK) {
+ durationUs = -1;
+ }
for (size_t i = 0; i < mPacketSources.size(); ++i) {
// we don't check subtitles for buffering level
if (!(mStreamMask & mPacketSources.keyAt(i)
@@ -1791,34 +2052,99 @@ bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) {
continue;
}
- ++activeCount;
int64_t bufferedDurationUs =
mPacketSources[i]->getEstimatedDurationUs();
ALOGV("source[%zu]: buffered %lld us", i, (long long)bufferedDurationUs);
- if (bufferedDurationUs < kLowWaterMark) {
- ++lowCount;
- break;
- } else if (bufferedDurationUs > kHighWaterMark) {
- ++midCount;
- ++highCount;
- } else if (bufferedDurationUs > kMidWaterMark) {
- ++midCount;
+ if (durationUs >= 0) {
+ int32_t percent;
+ if (mPacketSources[i]->isFinished(0 /* duration */)) {
+ percent = 100;
+ } else {
+ percent = (int32_t)(100.0 * (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs);
+ }
+ if (minBufferPercent < 0 || percent < minBufferPercent) {
+ minBufferPercent = percent;
+ }
}
+
+ ++activeCount;
+ int64_t readyMark = mInPreparationPhase ? kPrepareMark : kReadyMark;
+ if (bufferedDurationUs > readyMark
+ || mPacketSources[i]->isFinished(0)) {
+ ++readyCount;
+ }
+ if (!mPacketSources[i]->isFinished(0)) {
+ if (bufferedDurationUs < kUnderflowMark) {
+ ++underflowCount;
+ }
+ if (bufferedDurationUs > mUpSwitchMark) {
+ ++upCount;
+ } else if (bufferedDurationUs < mDownSwitchMark) {
+ ++downCount;
+ }
+ }
+ }
+
+ if (minBufferPercent >= 0) {
+ notifyBufferingUpdate(minBufferPercent);
}
if (activeCount > 0) {
- high = (highCount == activeCount);
- mid = (midCount == activeCount);
- low = (lowCount > 0);
+ up = (upCount == activeCount);
+ down = (downCount > 0);
+ ready = (readyCount == activeCount);
+ underflow = (underflowCount > 0);
return true;
}
return false;
}
+void LiveSession::startBufferingIfNecessary() {
+ ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
+ mInPreparationPhase, mBuffering);
+ if (!mBuffering) {
+ mBuffering = true;
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatBufferingStart);
+ notify->post();
+ }
+}
+
+void LiveSession::stopBufferingIfNecessary() {
+ ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d",
+ mInPreparationPhase, mBuffering);
+
+ if (mBuffering) {
+ mBuffering = false;
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatBufferingEnd);
+ notify->post();
+ }
+}
+
+void LiveSession::notifyBufferingUpdate(int32_t percentage) {
+ if (percentage < mPrevBufferPercentage) {
+ percentage = mPrevBufferPercentage;
+ } else if (percentage > 100) {
+ percentage = 100;
+ }
+
+ mPrevBufferPercentage = percentage;
+
+ ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage);
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatBufferingUpdate);
+ notify->setInt32("percentage", percentage);
+ notify->post();
+}
+
void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
// no need to check bandwidth if we only have 1 bandwidth settings
- if (mBandwidthItems.size() < 2) {
+ if (mSwitchInProgress || mBandwidthItems.size() < 2) {
return;
}
@@ -1850,6 +2176,22 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
}
}
+void LiveSession::postError(status_t err) {
+ // if we reached EOS, notify buffering of 100%
+ if (err == ERROR_END_OF_STREAM) {
+ notifyBufferingUpdate(100);
+ }
+ // we'll stop buffer polling now, before that notify
+ // stop buffering to stop the spinning icon
+ stopBufferingIfNecessary();
+ cancelPollBuffering();
+
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatError);
+ notify->setInt32("err", err);
+ notify->post();
+}
+
void LiveSession::postPrepared(status_t err) {
CHECK(mInPreparationPhase);
@@ -1857,6 +2199,8 @@ void LiveSession::postPrepared(status_t err) {
if (err == OK || err == ERROR_END_OF_STREAM) {
notify->setInt32("what", kWhatPrepared);
} else {
+ cancelPollBuffering();
+
notify->setInt32("what", kWhatPreparationFailed);
notify->setInt32("err", err);
}
diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h
index cbf988e..d11675b 100644
--- a/media/libstagefright/httplive/LiveSession.h
+++ b/media/libstagefright/httplive/LiveSession.h
@@ -89,11 +89,16 @@ struct LiveSession : public AHandler {
bool isSeekable() const;
bool hasDynamicDuration() const;
+ static const char *getKeyForStream(StreamType type);
+
enum {
kWhatStreamsChanged,
kWhatError,
kWhatPrepared,
kWhatPreparationFailed,
+ kWhatBufferingStart,
+ kWhatBufferingEnd,
+ kWhatBufferingUpdate,
};
protected:
@@ -116,9 +121,15 @@ private:
kWhatPollBuffering = 'poll',
};
- static const int64_t kHighWaterMark;
- static const int64_t kMidWaterMark;
- static const int64_t kLowWaterMark;
+ // Bandwidth Switch Mark Defaults
+ static const int64_t kUpSwitchMark = 25000000ll;
+ static const int64_t kDownSwitchMark = 18000000ll;
+ static const int64_t kUpSwitchMargin = 5000000ll;
+
+ // Buffer Prepare/Ready/Underflow Marks
+ static const int64_t kReadyMark = 5000000ll;
+ static const int64_t kPrepareMark = 1500000ll;
+ static const int64_t kUnderflowMark = 1000000ll;
struct BandwidthEstimator;
struct BandwidthItem {
@@ -160,8 +171,10 @@ private:
uint32_t mFlags;
sp<IMediaHTTPService> mHTTPService;
+ bool mBuffering;
bool mInPreparationPhase;
- bool mBuffering[kMaxStreams];
+ int32_t mPollBufferingGeneration;
+ int32_t mPrevBufferPercentage;
sp<HTTPBase> mHTTPDataSource;
KeyedVector<String8, String8> mExtraHeaders;
@@ -170,6 +183,7 @@ private:
Vector<BandwidthItem> mBandwidthItems;
ssize_t mCurBandwidthIndex;
+ ssize_t mOrigBandwidthIndex;
int32_t mLastBandwidthBps;
sp<BandwidthEstimator> mBandwidthEstimator;
@@ -204,6 +218,10 @@ private:
bool mReconfigurationInProgress;
bool mSwitchInProgress;
+ int64_t mUpSwitchMark;
+ int64_t mDownSwitchMark;
+ int64_t mUpSwitchMargin;
+
sp<AReplyToken> mDisconnectReplyID;
sp<AReplyToken> mSeekReplyID;
@@ -213,8 +231,6 @@ private:
KeyedVector<size_t, int64_t> mDiscontinuityAbsStartTimesUs;
KeyedVector<size_t, int64_t> mDiscontinuityOffsetTimesUs;
- int32_t mPollBufferingGeneration;
-
sp<PlaylistFetcher> addFetcher(const char *uri);
void onConnect(const sp<AMessage> &msg);
@@ -247,6 +263,10 @@ private:
sp<M3UParser> fetchPlaylist(
const char *url, uint8_t *curPlaylistHash, bool *unchanged);
+ bool resumeFetcher(
+ const AString &uri, uint32_t streamMask,
+ int64_t timeUs = -1ll, bool newUri = false);
+
float getAbortThreshold(
ssize_t currentBWIndex, ssize_t targetBWIndex) const;
void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
@@ -258,24 +278,32 @@ private:
static ssize_t typeToIndex(int32_t type);
void changeConfiguration(
- int64_t timeUs, size_t bandwidthIndex, bool pickTrack = false);
+ int64_t timeUs, ssize_t bwIndex = -1, bool pickTrack = false);
void onChangeConfiguration(const sp<AMessage> &msg);
void onChangeConfiguration2(const sp<AMessage> &msg);
void onChangeConfiguration3(const sp<AMessage> &msg);
+
void swapPacketSource(StreamType stream);
- void tryToFinishBandwidthSwitch(const AString &uri);
+ void tryToFinishBandwidthSwitch(const AString &oldUri);
+ void cancelBandwidthSwitch(bool resume = false);
+ bool checkSwitchProgress(
+ sp<AMessage> &msg, int64_t delayUs, bool *needResumeUntil);
- void cancelBandwidthSwitch();
+ void switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow);
void schedulePollBuffering();
void cancelPollBuffering();
+ void restartPollBuffering();
void onPollBuffering();
- bool checkBuffering(bool &low, bool &mid, bool &high);
- void switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow);
+ bool checkBuffering(bool &underflow, bool &ready, bool &down, bool &up);
+ void startBufferingIfNecessary();
+ void stopBufferingIfNecessary();
+ void notifyBufferingUpdate(int32_t percentage);
void finishDisconnect();
void postPrepared(status_t err);
+ void postError(status_t err);
DISALLOW_EVIL_CONSTRUCTORS(LiveSession);
};
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp
index 68f0357..3ace396 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.cpp
+++ b/media/libstagefright/httplive/PlaylistFetcher.cpp
@@ -33,6 +33,7 @@
#include <media/stagefright/foundation/ABitReader.h>
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/foundation/AUtils.h>
#include <media/stagefright/foundation/hexdump.h>
#include <media/stagefright/FileSource.h>
#include <media/stagefright/MediaDefs.h>
@@ -47,7 +48,7 @@
namespace android {
// static
-const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
+const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
const int64_t PlaylistFetcher::kFetcherResumeThreshold = 100000ll;
// LCM of 188 (size of a TS packet) & 1k works well
@@ -157,8 +158,8 @@ PlaylistFetcher::PlaylistFetcher(
mSeqNumber(-1),
mNumRetries(0),
mStartup(true),
+ mIDRFound(false),
mSeekMode(LiveSession::kSeekModeExactPosition),
- mPrepared(false),
mTimeChangeSignaled(false),
mNextPTSTimeUs(-1ll),
mMonitorQueueGeneration(0),
@@ -208,6 +209,32 @@ int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
return segmentStartUs;
}
+int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
+ CHECK(mPlaylist != NULL);
+
+ int32_t firstSeqNumberInPlaylist;
+ if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
+ "media-sequence", &firstSeqNumberInPlaylist)) {
+ firstSeqNumberInPlaylist = 0;
+ }
+
+ int32_t lastSeqNumberInPlaylist =
+ firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
+
+ CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
+ CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
+
+ int32_t index = seqNumber - firstSeqNumberInPlaylist;
+ sp<AMessage> itemMeta;
+ CHECK(mPlaylist->itemAt(
+ index, NULL /* uri */, &itemMeta));
+
+ int64_t itemDurationUs;
+ CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
+
+ return itemDurationUs;
+}
+
int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
int64_t nowUs = ALooper::GetNowUs();
@@ -559,7 +586,6 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
mStopParams.clear();
mStartTimeUsNotify = mNotify->dup();
mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
- mStartTimeUsNotify->setInt32("streamMask", 0);
mStartTimeUsNotify->setString("uri", mURI);
uint32_t streamTypeMask;
@@ -604,20 +630,25 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
mStreamTypeMask = streamTypeMask;
mSegmentStartTimeUs = segmentStartTimeUs;
- mDiscontinuitySeq = startDiscontinuitySeq;
+
+ if (startDiscontinuitySeq >= 0) {
+ mDiscontinuitySeq = startDiscontinuitySeq;
+ }
mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
mSeekMode = (LiveSession::SeekMode) seekMode;
+ if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
+ mStartup = true;
+ mIDRFound = false;
+ mVideoBuffer->clear();
+ }
+
if (startTimeUs >= 0) {
mStartTimeUs = startTimeUs;
mFirstPTSValid = false;
mSeqNumber = -1;
- mStartup = true;
- mPrepared = false;
- mIDRFound = false;
mTimeChangeSignaled = false;
- mVideoBuffer->clear();
mDownloadState->resetState();
}
@@ -663,8 +694,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
for (size_t i = 0; i < mPacketSources.size(); i++) {
sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
- const char *stopKey;
- int streamType = mPacketSources.keyAt(i);
+ LiveSession::StreamType streamType = mPacketSources.keyAt(i);
if (streamType == LiveSession::STREAMTYPE_SUBTITLES) {
// the subtitle track can always be stopped
@@ -672,18 +702,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
continue;
}
- switch (streamType) {
- case LiveSession::STREAMTYPE_VIDEO:
- stopKey = "timeUsVideo";
- break;
-
- case LiveSession::STREAMTYPE_AUDIO:
- stopKey = "timeUsAudio";
- break;
-
- default:
- TRESPASS();
- }
+ const char *stopKey = LiveSession::getKeyForStream(streamType);
// check if this stream has too little data left to be resumed
int32_t discontinuitySeq;
@@ -701,7 +720,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
// Don't resume if all streams are within a resume threshold
if (stopCount == mPacketSources.size()) {
- stopAsync(/* clear = */ false);
+ notifyStopReached();
return OK;
}
@@ -711,6 +730,12 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
return OK;
}
+void PlaylistFetcher::notifyStopReached() {
+ sp<AMessage> notify = mNotify->dup();
+ notify->setInt32("what", kWhatStopReached);
+ notify->post();
+}
+
void PlaylistFetcher::notifyError(status_t err) {
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatError);
@@ -730,7 +755,12 @@ void PlaylistFetcher::queueDiscontinuity(
void PlaylistFetcher::onMonitorQueue() {
bool downloadMore = false;
- refreshPlaylist();
+
+ // in the middle of an unfinished download, delay
+ // playlist refresh as it'll change seq numbers
+ if (!mDownloadState->hasSavedState()) {
+ refreshPlaylist();
+ }
int32_t targetDurationSecs;
int64_t targetDurationUs = kMinBufferedDurationUs;
@@ -826,6 +856,13 @@ status_t PlaylistFetcher::refreshPlaylist() {
if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
updateDuration();
}
+ // Notify LiveSession to use target-duration based buffering level
+ // for up/down switch. Default LiveSession::kUpSwitchMark may not
+ // be reachable for live streams, as our max buffering amount is
+ // limited to 3 segments.
+ if (!mPlaylist->isComplete()) {
+ updateTargetDuration();
+ }
}
mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
@@ -838,17 +875,12 @@ bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
return buffer->size() > 0 && buffer->data()[0] == 0x47;
}
-bool PlaylistFetcher::shouldPauseDownload(bool startFound) {
+bool PlaylistFetcher::shouldPauseDownload() {
if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
// doesn't apply to subtitles
return false;
}
- // If we're switching, save state and pause after start point is found
- if (mSeekMode != LiveSession::kSeekModeExactPosition && startFound) {
- return true;
- }
-
// Calculate threshold to abort current download
int32_t targetDurationSecs;
CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
@@ -1020,7 +1052,7 @@ bool PlaylistFetcher::initDownloadState(
// but since the segments we are supposed to fetch have already rolled off
// the playlist, i.e. we have already missed the boat, we inevitably have to
// skip.
- stopAsync(/* clear = */ false);
+ notifyStopReached();
return false;
}
mSeqNumber = lastSeqNumberInPlaylist - 3;
@@ -1031,12 +1063,21 @@ bool PlaylistFetcher::initDownloadState(
// fall through
} else {
- ALOGE("Cannot find sequence number %d in playlist "
- "(contains %d - %d)",
- mSeqNumber, firstSeqNumberInPlaylist,
- firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
+ if (mPlaylist != NULL) {
+ ALOGE("Cannot find sequence number %d in playlist "
+ "(contains %d - %d)",
+ mSeqNumber, firstSeqNumberInPlaylist,
+ firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
+
+ notifyError(ERROR_END_OF_STREAM);
+ } else {
+ // It's possible that we were never able to download the playlist.
+ // In this case we should notify error, instead of EOS, as EOS during
+ // prepare means we succeeded in downloading everything.
+ ALOGE("Failed to download playlist!");
+ notifyError(ERROR_IO);
+ }
- notifyError(ERROR_END_OF_STREAM);
return false;
}
}
@@ -1102,8 +1143,10 @@ bool PlaylistFetcher::initDownloadState(
// Signal a format discontinuity to ATSParser to clear partial data
// from previous streams. Not doing this causes bitstream corruption.
- mTSParser->signalDiscontinuity(
- ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */);
+ if (mTSParser != NULL) {
+ mTSParser->signalDiscontinuity(
+ ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */);
+ }
queueDiscontinuity(
ATSParser::DISCONTINUITY_FORMATCHANGE,
@@ -1166,6 +1209,7 @@ void PlaylistFetcher::onDownloadNext() {
}
// block-wise download
+ bool shouldPause = false;
ssize_t bytesRead;
do {
sp<DataSource> source = mHTTPDataSource;
@@ -1238,21 +1282,34 @@ void PlaylistFetcher::onDownloadNext() {
return;
} else if (err == ERROR_OUT_OF_RANGE) {
// reached stopping point
- stopAsync(/* clear = */ false);
+ notifyStopReached();
return;
} else if (err != OK) {
notifyError(err);
return;
- } else if (bytesRead != 0 &&
- shouldPauseDownload(mStartup != startUp /* startFound */)) {
- mDownloadState->saveState(
- uri,
- itemMeta,
- buffer,
- tsBuffer,
- firstSeqNumberInPlaylist,
- lastSeqNumberInPlaylist);
- return;
+ }
+ // If we're switching, post start notification
+ // this should only be posted when the last chunk is full processed by TSParser
+ if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
+ CHECK(mStartTimeUsNotify != NULL);
+ mStartTimeUsNotify->post();
+ mStartTimeUsNotify.clear();
+ shouldPause = true;
+ }
+ if (shouldPause || shouldPauseDownload()) {
+ // save state and return if this is not the last chunk,
+ // leaving the fetcher in paused state.
+ if (bytesRead != 0) {
+ mDownloadState->saveState(
+ uri,
+ itemMeta,
+ buffer,
+ tsBuffer,
+ firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
+ return;
+ }
+ shouldPause = true;
}
} while (bytesRead != 0);
@@ -1290,7 +1347,6 @@ void PlaylistFetcher::onDownloadNext() {
return;
}
- status_t err = OK;
if (tsBuffer != NULL) {
AString method;
CHECK(buffer->meta()->findString("cipher-method", &method));
@@ -1304,30 +1360,40 @@ void PlaylistFetcher::onDownloadNext() {
}
// bulk extract non-ts files
+ bool startUp = mStartup;
if (tsBuffer == NULL) {
- err = extractAndQueueAccessUnits(buffer, itemMeta);
+ status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
if (err == -EAGAIN) {
// starting sequence number too low/high
postMonitorQueue();
return;
} else if (err == ERROR_OUT_OF_RANGE) {
// reached stopping point
- stopAsync(/* clear = */false);
+ notifyStopReached();
+ return;
+ } else if (err != OK) {
+ notifyError(err);
return;
}
}
- if (err != OK) {
- notifyError(err);
- return;
+ ++mSeqNumber;
+
+ // if adapting, pause after found the next starting point
+ if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
+ CHECK(mStartTimeUsNotify != NULL);
+ mStartTimeUsNotify->post();
+ mStartTimeUsNotify.clear();
+ shouldPause = true;
}
- ++mSeqNumber;
- postMonitorQueue();
+ if (!shouldPause) {
+ postMonitorQueue();
+ }
}
int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(
- int64_t anchorTimeUs, int64_t targetDurationUs) const {
+ int64_t anchorTimeUs, int64_t targetDiffUs) const {
int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
if (mPlaylist->meta() == NULL
|| !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
@@ -1336,8 +1402,8 @@ int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(
lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
- // adjust anchorTimeUs to within 1x targetDurationUs from mStartTimeUs
- while (index >= 0 && anchorTimeUs - mStartTimeUs > targetDurationUs) {
+ // adjust anchorTimeUs to within targetDiffUs from mStartTimeUs
+ while (index >= 0 && anchorTimeUs - mStartTimeUs > targetDiffUs) {
sp<AMessage> itemMeta;
CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
@@ -1439,6 +1505,7 @@ const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
+ accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
return accessUnit;
}
@@ -1477,30 +1544,15 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
for (size_t i = mPacketSources.size(); i-- > 0;) {
sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
- const char *key;
- ATSParser::SourceType type;
const LiveSession::StreamType stream = mPacketSources.keyAt(i);
- switch (stream) {
- case LiveSession::STREAMTYPE_VIDEO:
- type = ATSParser::VIDEO;
- key = "timeUsVideo";
- break;
-
- case LiveSession::STREAMTYPE_AUDIO:
- type = ATSParser::AUDIO;
- key = "timeUsAudio";
- break;
-
- case LiveSession::STREAMTYPE_SUBTITLES:
- {
- ALOGE("MPEG2 Transport streams do not contain subtitles.");
- return ERROR_MALFORMED;
- break;
- }
-
- default:
- TRESPASS();
+ if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
+ ALOGE("MPEG2 Transport streams do not contain subtitles.");
+ return ERROR_MALFORMED;
}
+ const char *key = LiveSession::getKeyForStream(stream);
+ ATSParser::SourceType type =
+ (stream == LiveSession::STREAMTYPE_AUDIO) ?
+ ATSParser::AUDIO : ATSParser::VIDEO;
sp<AnotherPacketSource> source =
static_cast<AnotherPacketSource *>(
@@ -1523,8 +1575,58 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
int64_t timeUs;
CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
+ bool seeking = mSeekMode == LiveSession::kSeekModeExactPosition;
if (mSegmentFirstPTS < 0ll) {
mSegmentFirstPTS = timeUs;
+ if (!seeking) {
+ int32_t firstSeqNumberInPlaylist;
+ if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
+ "media-sequence", &firstSeqNumberInPlaylist)) {
+ firstSeqNumberInPlaylist = 0;
+ }
+
+ int32_t targetDurationSecs;
+ CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
+ int64_t targetDurationUs = targetDurationSecs * 1000000ll;
+ // mStartup
+ // mStartup is true until we have queued a packet for all the streams
+ // we are fetching. We queue packets whose timestamps are greater than
+ // mStartTimeUs.
+ // mSegmentStartTimeUs >= 0
+ // mSegmentStartTimeUs is non-negative when adapting or switching tracks
+ // mSeqNumber > firstSeqNumberInPlaylist
+ // don't decrement mSeqNumber if it already points to the 1st segment
+ // timeUs - mStartTimeUs > targetDurationUs:
+ // This and the 2 above conditions should only happen when adapting in a live
+ // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
+ // would start fetching after timeUs, which should be greater than mStartTimeUs;
+ // the old fetcher would then continue fetching data until timeUs. We don't want
+ // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
+ // stop as early as possible. The definition of being "too far ahead" is
+ // arbitrary; here we use targetDurationUs as threshold.
+ int64_t targetDiffUs =(mSeekMode == LiveSession::kSeekModeNextSample
+ ? 0 : targetDurationUs);
+ if (mStartup && mSegmentStartTimeUs >= 0
+ && mSeqNumber > firstSeqNumberInPlaylist
+ && timeUs - mStartTimeUs > targetDiffUs) {
+ // we just guessed a starting timestamp that is too high when adapting in a
+ // live stream; re-adjust based on the actual timestamp extracted from the
+ // media segment; if we didn't move backward after the re-adjustment
+ // (newSeqNumber), start at least 1 segment prior.
+ int32_t newSeqNumber = getSeqNumberWithAnchorTime(
+ timeUs, targetDiffUs);
+ if (newSeqNumber >= mSeqNumber) {
+ --mSeqNumber;
+ } else {
+ mSeqNumber = newSeqNumber;
+ }
+ mStartTimeUsNotify = mNotify->dup();
+ mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
+ mStartTimeUsNotify->setString("uri", mURI);
+ mIDRFound = false;
+ return -EAGAIN;
+ }
+ }
}
if (mStartup) {
if (!mFirstPTSValid) {
@@ -1538,10 +1640,8 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
}
}
- bool seeking = mSeekMode == LiveSession::kSeekModeExactPosition;
bool startTimeReached =
- seeking ? (timeUs >= mStartTimeUs)
- : (timeUs > mStartTimeUs);
+ seeking ? (timeUs >= mStartTimeUs) : true;
if (!startTimeReached || (isAvc && !mIDRFound)) {
// buffer up to the closest preceding IDR frame in the next segement,
@@ -1562,51 +1662,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
}
if (mStartTimeUsNotify != NULL) {
- int32_t firstSeqNumberInPlaylist;
- if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
- "media-sequence", &firstSeqNumberInPlaylist)) {
- firstSeqNumberInPlaylist = 0;
- }
-
- int32_t targetDurationSecs;
- CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
- int64_t targetDurationUs = targetDurationSecs * 1000000ll;
- // mStartup
- // mStartup is true until we have queued a packet for all the streams
- // we are fetching. We queue packets whose timestamps are greater than
- // mStartTimeUs.
- // mSegmentStartTimeUs >= 0
- // mSegmentStartTimeUs is non-negative when adapting or switching tracks
- // mSeqNumber > firstSeqNumberInPlaylist
- // don't decrement mSeqNumber if it already points to the 1st segment
- // timeUs - mStartTimeUs > targetDurationUs:
- // This and the 2 above conditions should only happen when adapting in a live
- // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
- // would start fetching after timeUs, which should be greater than mStartTimeUs;
- // the old fetcher would then continue fetching data until timeUs. We don't want
- // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
- // stop as early as possible. The definition of being "too far ahead" is
- // arbitrary; here we use targetDurationUs as threshold.
- if (mStartup && mSegmentStartTimeUs >= 0
- && mSeqNumber > firstSeqNumberInPlaylist
- && timeUs - mStartTimeUs > targetDurationUs) {
- // we just guessed a starting timestamp that is too high when adapting in a
- // live stream; re-adjust based on the actual timestamp extracted from the
- // media segment; if we didn't move backward after the re-adjustment
- // (newSeqNumber), start at least 1 segment prior.
- int32_t newSeqNumber = getSeqNumberWithAnchorTime(
- timeUs, targetDurationUs);
- if (newSeqNumber >= mSeqNumber) {
- --mSeqNumber;
- } else {
- mSeqNumber = newSeqNumber;
- }
- mStartTimeUsNotify = mNotify->dup();
- mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
- mIDRFound = false;
- return -EAGAIN;
- }
-
int32_t seq;
if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
@@ -1622,12 +1677,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
if (streamMask == mStreamTypeMask) {
mStartup = false;
- // only need to post if we're switching and searching for a
- // start point in next segment, or next IDR
- if (mSeekMode != LiveSession::kSeekModeExactPosition) {
- mStartTimeUsNotify->post();
- }
- mStartTimeUsNotify.clear();
}
}
}
@@ -1887,11 +1936,13 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
int64_t targetDurationUs = targetDurationSecs * 1000000ll;
+ int64_t targetDiffUs =(mSeekMode == LiveSession::kSeekModeNextSample
+ ? 0 : targetDurationUs);
// Duplicated logic from how we handle .ts playlists.
if (mStartup && mSegmentStartTimeUs >= 0
- && timeUs - mStartTimeUs > targetDurationUs) {
+ && timeUs - mStartTimeUs > targetDiffUs) {
int32_t newSeqNumber = getSeqNumberWithAnchorTime(
- timeUs, targetDurationUs);
+ timeUs, targetDiffUs);
if (newSeqNumber >= mSeqNumber) {
--mSeqNumber;
} else {
@@ -1903,8 +1954,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
mStartTimeUsNotify->setInt64("timeUsAudio", timeUs);
mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
- mStartTimeUsNotify->post();
- mStartTimeUsNotify.clear();
mStartup = false;
}
}
@@ -1953,4 +2002,15 @@ void PlaylistFetcher::updateDuration() {
msg->post();
}
+void PlaylistFetcher::updateTargetDuration() {
+ int32_t targetDurationSecs;
+ CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
+ int64_t targetDurationUs = targetDurationSecs * 1000000ll;
+
+ sp<AMessage> msg = mNotify->dup();
+ msg->setInt32("what", kWhatTargetDurationUpdate);
+ msg->setInt64("targetDurationUs", targetDurationUs);
+ msg->post();
+}
+
} // namespace android
diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h
index 8d34cbc..dab56df 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.h
+++ b/media/libstagefright/httplive/PlaylistFetcher.h
@@ -44,9 +44,11 @@ struct PlaylistFetcher : public AHandler {
kWhatStopped,
kWhatError,
kWhatDurationUpdate,
+ kWhatTargetDurationUpdate,
kWhatPrepared,
kWhatPreparationFailed,
kWhatStartedAt,
+ kWhatStopReached,
};
PlaylistFetcher(
@@ -64,7 +66,7 @@ struct PlaylistFetcher : public AHandler {
int64_t startTimeUs = -1ll, // starting timestamps
int64_t segmentStartTimeUs = -1ll, // starting position within playlist
// startTimeUs!=segmentStartTimeUs only when playlist is live
- int32_t startDiscontinuitySeq = 0,
+ int32_t startDiscontinuitySeq = -1,
LiveSession::SeekMode seekMode = LiveSession::kSeekModeExactPosition);
void pauseAsync(float thresholdRatio);
@@ -135,7 +137,6 @@ private:
bool mStartup;
bool mIDRFound;
int32_t mSeekMode;
- bool mPrepared;
bool mTimeChangeSignaled;
int64_t mNextPTSTimeUs;
@@ -187,7 +188,7 @@ private:
void postMonitorQueue(int64_t delayUs = 0, int64_t minDelayUs = 0);
void cancelMonitorQueue();
void setStoppingThreshold(float thresholdRatio);
- bool shouldPauseDownload(bool startFound);
+ bool shouldPauseDownload();
int64_t delayUsToRefreshPlaylist() const;
status_t refreshPlaylist();
@@ -195,6 +196,8 @@ private:
// Returns the media time in us of the segment specified by seqNumber.
// This is computed by summing the durations of all segments before it.
int64_t getSegmentStartTimeUs(int32_t seqNumber) const;
+ // Returns the duration time in us of the segment specified.
+ int64_t getSegmentDurationUs(int32_t seqNumber) const;
status_t onStart(const sp<AMessage> &msg);
void onPause();
@@ -219,6 +222,7 @@ private:
status_t extractAndQueueAccessUnits(
const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta);
+ void notifyStopReached();
void notifyError(status_t err);
void queueDiscontinuity(
@@ -230,6 +234,7 @@ private:
int32_t getSeqNumberForTime(int64_t timeUs) const;
void updateDuration();
+ void updateTargetDuration();
DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher);
};
diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp
index 9f42217..c2f1527 100644
--- a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp
+++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp
@@ -19,6 +19,8 @@
#include "AnotherPacketSource.h"
+#include "include/avc_utils.h"
+
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
@@ -38,6 +40,7 @@ const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
: mIsAudio(false),
mIsVideo(false),
+ mEnabled(true),
mFormat(NULL),
mLastQueuedTimeUs(0),
mEOSResult(OK),
@@ -155,7 +158,6 @@ status_t AnotherPacketSource::read(
const sp<ABuffer> buffer = *mBuffers.begin();
mBuffers.erase(mBuffers.begin());
- mLatestDequeuedMeta = buffer->meta()->dup();
int32_t discontinuity;
if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
@@ -166,6 +168,8 @@ status_t AnotherPacketSource::read(
return INFO_DISCONTINUITY;
}
+ mLatestDequeuedMeta = buffer->meta()->dup();
+
sp<RefBase> object;
if (buffer->meta()->findObject("format", &object)) {
setFormat(static_cast<MetaData*>(object.get()));
@@ -205,20 +209,26 @@ void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
return;
}
- int64_t lastQueuedTimeUs;
- CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
- mLastQueuedTimeUs = lastQueuedTimeUs;
- ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
-
Mutex::Autolock autoLock(mLock);
mBuffers.push_back(buffer);
mCondition.signal();
int32_t discontinuity;
if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
+ // discontinuity handling needs to be consistent with queueDiscontinuity()
++mQueuedDiscontinuityCount;
+ mLastQueuedTimeUs = 0ll;
+ mEOSResult = OK;
+ mLatestEnqueuedMeta = NULL;
+ return;
}
+ int64_t lastQueuedTimeUs;
+ CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
+ mLastQueuedTimeUs = lastQueuedTimeUs;
+ ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
+ mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
+
if (mLatestEnqueuedMeta == NULL) {
mLatestEnqueuedMeta = buffer->meta()->dup();
} else {
@@ -299,6 +309,9 @@ void AnotherPacketSource::signalEOS(status_t result) {
bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
Mutex::Autolock autoLock(mLock);
*finalResult = OK;
+ if (!mEnabled) {
+ return false;
+ }
if (!mBuffers.empty()) {
return true;
}
@@ -310,6 +323,9 @@ bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
Mutex::Autolock autoLock(mLock);
*finalResult = OK;
+ if (!mEnabled) {
+ return false;
+ }
List<sp<ABuffer> >::iterator it;
for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
int32_t discontinuity;
@@ -440,4 +456,140 @@ sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
return mLatestDequeuedMeta;
}
+void AnotherPacketSource::enable(bool enable) {
+ Mutex::Autolock autoLock(mLock);
+ mEnabled = enable;
+}
+
+sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
+ Mutex::Autolock autoLock(mLock);
+ int64_t firstUs = -1;
+ int64_t lastUs = -1;
+ int64_t durationUs = 0;
+
+ List<sp<ABuffer> >::iterator it;
+ for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
+ const sp<ABuffer> &buffer = *it;
+ int32_t discontinuity;
+ if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
+ durationUs += lastUs - firstUs;
+ firstUs = -1;
+ lastUs = -1;
+ continue;
+ }
+ int64_t timeUs;
+ if (buffer->meta()->findInt64("timeUs", &timeUs)) {
+ if (firstUs < 0) {
+ firstUs = timeUs;
+ }
+ if (lastUs < 0 || timeUs > lastUs) {
+ lastUs = timeUs;
+ }
+ if (durationUs + (lastUs - firstUs) >= delayUs) {
+ return buffer->meta();
+ }
+ }
+ }
+ return mLatestEnqueuedMeta;
+}
+
+void AnotherPacketSource::trimBuffersAfterTimeUs(
+ size_t discontinuitySeq, int64_t timeUs) {
+ ALOGV("trimBuffersAfterTimeUs: discontinuitySeq %zu, timeUs %lld",
+ discontinuitySeq, (long long)timeUs);
+
+ Mutex::Autolock autoLock(mLock);
+ if (mBuffers.empty()) {
+ return;
+ }
+
+ List<sp<ABuffer> >::iterator it;
+ sp<AMessage> newLatestEnqueuedMeta = NULL;
+ int64_t newLastQueuedTimeUs = 0;
+ size_t newDiscontinuityCount = 0;
+ for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
+ const sp<ABuffer> &buffer = *it;
+ int32_t discontinuity;
+ if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
+ newDiscontinuityCount++;
+ continue;
+ }
+ size_t curDiscontinuitySeq;
+ int64_t curTimeUs;
+ CHECK(buffer->meta()->findInt32(
+ "discontinuitySeq", (int32_t*)&curDiscontinuitySeq));
+ CHECK(buffer->meta()->findInt64("timeUs", &curTimeUs));
+ if ((curDiscontinuitySeq > discontinuitySeq
+ || (curDiscontinuitySeq == discontinuitySeq
+ && curTimeUs >= timeUs))) {
+ ALOGI("trimming from %lld (inclusive) to end",
+ (long long)curTimeUs);
+ break;
+ }
+ newLatestEnqueuedMeta = buffer->meta();
+ newLastQueuedTimeUs = curTimeUs;
+ }
+ mBuffers.erase(it, mBuffers.end());
+ mLatestEnqueuedMeta = newLatestEnqueuedMeta;
+ mLastQueuedTimeUs = newLastQueuedTimeUs;
+ mQueuedDiscontinuityCount = newDiscontinuityCount;
+}
+
+sp<AMessage> AnotherPacketSource::trimBuffersBeforeTimeUs(
+ size_t discontinuitySeq, int64_t timeUs) {
+ ALOGV("trimBuffersBeforeTimeUs: discontinuitySeq %zu, timeUs %lld",
+ discontinuitySeq, (long long)timeUs);
+ sp<AMessage> meta;
+ Mutex::Autolock autoLock(mLock);
+ if (mBuffers.empty()) {
+ return NULL;
+ }
+
+ sp<MetaData> format;
+ bool isAvc = false;
+
+ List<sp<ABuffer> >::iterator it;
+ size_t discontinuityCount = 0;
+ for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
+ const sp<ABuffer> &buffer = *it;
+ int32_t discontinuity;
+ if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
+ format = NULL;
+ isAvc = false;
+ discontinuityCount++;
+ continue;
+ }
+ if (format == NULL) {
+ sp<RefBase> object;
+ if (buffer->meta()->findObject("format", &object)) {
+ const char* mime;
+ format = static_cast<MetaData*>(object.get());
+ isAvc = format != NULL
+ && format->findCString(kKeyMIMEType, &mime)
+ && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
+ }
+ }
+ if (isAvc && !IsIDR(buffer)) {
+ continue;
+ }
+ size_t curDiscontinuitySeq;
+ int64_t curTimeUs;
+ CHECK(buffer->meta()->findInt32(
+ "discontinuitySeq", (int32_t*)&curDiscontinuitySeq));
+ CHECK(buffer->meta()->findInt64("timeUs", &curTimeUs));
+ if ((curDiscontinuitySeq > discontinuitySeq
+ || (curDiscontinuitySeq == discontinuitySeq
+ && curTimeUs > timeUs))) {
+ ALOGI("trimming from beginning to %lld (not inclusive)",
+ (long long)curTimeUs);
+ meta = buffer->meta();
+ break;
+ }
+ }
+ mBuffers.erase(mBuffers.begin(), it);
+ mQueuedDiscontinuityCount -= discontinuityCount;
+ mLatestDequeuedMeta = NULL;
+ return meta;
+}
+
} // namespace android
diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.h b/media/libstagefright/mpeg2ts/AnotherPacketSource.h
index d4fde7c..e126006 100644
--- a/media/libstagefright/mpeg2ts/AnotherPacketSource.h
+++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.h
@@ -70,8 +70,14 @@ struct AnotherPacketSource : public MediaSource {
bool isFinished(int64_t duration) const;
+ void enable(bool enable);
+
sp<AMessage> getLatestEnqueuedMeta();
sp<AMessage> getLatestDequeuedMeta();
+ sp<AMessage> getMetaAfterLastDequeued(int64_t delayUs);
+
+ void trimBuffersAfterTimeUs(size_t discontinuitySeq, int64_t timeUs);
+ sp<AMessage> trimBuffersBeforeTimeUs(size_t discontinuitySeq, int64_t timeUs);
protected:
virtual ~AnotherPacketSource();
@@ -82,6 +88,7 @@ private:
bool mIsAudio;
bool mIsVideo;
+ bool mEnabled;
sp<MetaData> mFormat;
int64_t mLastQueuedTimeUs;
List<sp<ABuffer> > mBuffers;