summaryrefslogtreecommitdiffstats
path: root/media/libstagefright/httplive
diff options
context:
space:
mode:
authorRobert Shih <robertshih@google.com>2014-02-20 13:07:26 -0800
committerRobert Shih <robertshih@google.com>2014-02-28 16:43:45 -0800
commit1543d3c735a5ba4ddfcf8ab644575df13c7e30a9 (patch)
treeb9dcbb84dd7b6e7192faf577ebebd8b7adc7ea28 /media/libstagefright/httplive
parent68d074fe4538e0f1bd647c3f3aa932ea7dca332a (diff)
downloadframeworks_av-1543d3c735a5ba4ddfcf8ab644575df13c7e30a9.zip
frameworks_av-1543d3c735a5ba4ddfcf8ab644575df13c7e30a9.tar.gz
frameworks_av-1543d3c735a5ba4ddfcf8ab644575df13c7e30a9.tar.bz2
Initial HLS seamless switch implementation.
Bug: 11854054 Change-Id: I75fc2a258111295039ac13cc37e407df25891dd2
Diffstat (limited to 'media/libstagefright/httplive')
-rw-r--r--media/libstagefright/httplive/LiveSession.cpp280
-rw-r--r--media/libstagefright/httplive/LiveSession.h36
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.cpp251
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.h22
4 files changed, 541 insertions, 48 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index 95779c4..2af4682 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -40,6 +40,8 @@
#include <media/stagefright/MetaData.h>
#include <media/stagefright/Utils.h>
+#include <utils/Mutex.h>
+
#include <ctype.h>
#include <openssl/aes.h>
#include <openssl/md5.h>
@@ -56,10 +58,14 @@ LiveSession::LiveSession(
mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
mPrevBandwidthIndex(-1),
mStreamMask(0),
+ mNewStreamMask(0),
+ mSwapMask(0),
mCheckBandwidthGeneration(0),
+ mSwitchGeneration(0),
mLastDequeuedTimeUs(0ll),
mRealTimeBaseUs(0ll),
mReconfigurationInProgress(false),
+ mSwitchInProgress(false),
mDisconnectReplyID(0) {
mStreams[kAudioIndex] = StreamItem("audio");
@@ -68,16 +74,37 @@ LiveSession::LiveSession(
for (size_t i = 0; i < kMaxStreams; ++i) {
mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
+ mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
}
}
LiveSession::~LiveSession() {
}
+sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
+ ABuffer *discontinuity = new ABuffer(0);
+ discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
+ discontinuity->meta()->setInt32("swapPacketSource", swap);
+ discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
+ discontinuity->meta()->setInt64("timeUs", -1);
+ return discontinuity;
+}
+
+void LiveSession::swapPacketSource(StreamType stream) {
+ sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
+ sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
+ sp<AnotherPacketSource> tmp = aps;
+ aps = aps2;
+ aps2 = tmp;
+ aps2->clear();
+}
+
status_t LiveSession::dequeueAccessUnit(
StreamType stream, sp<ABuffer> *accessUnit) {
if (!(mStreamMask & stream)) {
- return UNKNOWN_ERROR;
+ // return -EWOULDBLOCK to avoid halting the decoder
+ // when switching between audio/video and audio only.
+ return -EWOULDBLOCK;
}
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
@@ -117,6 +144,25 @@ status_t LiveSession::dequeueAccessUnit(
streamStr,
type,
extra == NULL ? "NULL" : extra->debugString().c_str());
+
+ int32_t swap;
+ if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
+ && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
+ && swap) {
+
+ int32_t switchGeneration;
+ CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
+ {
+ Mutex::Autolock lock(mSwapMutex);
+ if (switchGeneration == mSwitchGeneration) {
+ swapPacketSource(stream);
+ sp<AMessage> msg = new AMessage(kWhatSwapped, id());
+ msg->setInt32("stream", stream);
+ msg->setInt32("switchGeneration", switchGeneration);
+ msg->post();
+ }
+ }
+ }
} else if (err == OK) {
if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
int64_t timeUs;
@@ -138,6 +184,7 @@ status_t LiveSession::dequeueAccessUnit(
}
status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
+ // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
if (!(mStreamMask & stream)) {
return UNKNOWN_ERROR;
}
@@ -234,7 +281,12 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
if (what == PlaylistFetcher::kWhatStopped) {
AString uri;
CHECK(msg->findString("uri", &uri));
- mFetcherInfos.removeItem(uri);
+ if (mFetcherInfos.removeItem(uri) < 0) {
+ // ignore duplicated kWhatStopped messages.
+ break;
+ }
+
+ tryToFinishBandwidthSwitch();
}
if (mContinuation != NULL) {
@@ -270,6 +322,8 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
postPrepared(err);
}
+ cancelBandwidthSwitch();
+
mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
@@ -308,6 +362,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ case PlaylistFetcher::kWhatStartedAt:
+ {
+ int32_t switchGeneration;
+ CHECK(msg->findInt32("switchGeneration", &switchGeneration));
+
+ if (switchGeneration != mSwitchGeneration) {
+ break;
+ }
+
+ // Resume fetcher for the original variant; the resumed fetcher should
+ // continue until the timestamps found in msg, which is stored by the
+ // new fetcher to indicate where the new variant has started buffering.
+ for (size_t i = 0; i < mFetcherInfos.size(); i++) {
+ const FetcherInfo info = mFetcherInfos.valueAt(i);
+ if (info.mToBeRemoved) {
+ info.mFetcher->resumeUntilAsync(msg);
+ }
+ }
+ break;
+ }
+
default:
TRESPASS();
}
@@ -352,6 +427,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ case kWhatSwapped:
+ {
+ onSwapped(msg);
+ break;
+ }
default:
TRESPASS();
break;
@@ -462,6 +542,10 @@ void LiveSession::finishDisconnect() {
// during disconnection either.
cancelCheckBandwidthEvent();
+ // Protect mPacketSources from a swapPacketSource race condition through disconnect.
+ // (finishDisconnect, onFinishDisconnect2)
+ cancelBandwidthSwitch();
+
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
}
@@ -501,11 +585,13 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
notify->setString("uri", uri);
+ notify->setInt32("switchGeneration", mSwitchGeneration);
FetcherInfo info;
info.mFetcher = new PlaylistFetcher(notify, this, uri);
info.mDurationUs = -1ll;
info.mIsPrepared = false;
+ info.mToBeRemoved = false;
looper()->registerHandler(info.mFetcher);
mFetcherInfos.add(uri, info);
@@ -845,8 +931,25 @@ status_t LiveSession::selectTrack(size_t index, bool select) {
return err;
}
+bool LiveSession::canSwitchUp() {
+ // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
+ status_t err = OK;
+ for (size_t i = 0; i < mPacketSources.size(); ++i) {
+ sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
+ int64_t dur = source->getBufferedDurationUs(&err);
+ if (err == OK && dur > 10000000) {
+ return true;
+ }
+ }
+ return false;
+}
+
void LiveSession::changeConfiguration(
int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
+ // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
+ // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
+ cancelBandwidthSwitch();
+
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
@@ -862,7 +965,8 @@ void LiveSession::changeConfiguration(
CHECK_LT(bandwidthIndex, mBandwidthItems.size());
const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
- uint32_t streamMask = 0;
+ uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
+ uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
AString URIs[kMaxStreams];
for (size_t i = 0; i < kMaxStreams; ++i) {
@@ -880,9 +984,14 @@ void LiveSession::changeConfiguration(
// If we're seeking all current fetchers are discarded.
if (timeUs < 0ll) {
+ // delay fetcher removal
+ discardFetcher = false;
+
for (size_t j = 0; j < kMaxStreams; ++j) {
- if ((streamMask & indexToType(j)) && uri == URIs[j]) {
- discardFetcher = false;
+ StreamType type = indexToType(j);
+ if ((streamMask & type) && uri == URIs[j]) {
+ resumeMask |= type;
+ streamMask &= ~type;
}
}
}
@@ -894,8 +1003,15 @@ void LiveSession::changeConfiguration(
}
}
- sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
+ sp<AMessage> msg;
+ if (timeUs < 0ll) {
+ // skip onChangeConfiguration2 (decoder destruction) if switching.
+ msg = new AMessage(kWhatChangeConfiguration3, id());
+ } else {
+ msg = new AMessage(kWhatChangeConfiguration2, id());
+ }
msg->setInt32("streamMask", streamMask);
+ msg->setInt32("resumeMask", resumeMask);
msg->setInt64("timeUs", timeUs);
for (size_t i = 0; i < kMaxStreams; ++i) {
if (streamMask & indexToType(i)) {
@@ -978,11 +1094,13 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
}
void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
+ mContinuation.clear();
// All remaining fetchers are still suspended, the player has shutdown
// any decoders that needed it.
- uint32_t streamMask;
+ uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
+ CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
for (size_t i = 0; i < kMaxStreams; ++i) {
if (streamMask & indexToType(i)) {
@@ -991,37 +1109,39 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
}
int64_t timeUs;
+ bool switching = false;
CHECK(msg->findInt64("timeUs", &timeUs));
if (timeUs < 0ll) {
timeUs = mLastDequeuedTimeUs;
+ switching = true;
}
mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
- mStreamMask = streamMask;
+ mNewStreamMask = streamMask;
- // Resume all existing fetchers and assign them packet sources.
+ // Of all existing fetchers:
+ // * Resume fetchers that are still needed and assign them original packet sources.
+ // * Mark otherwise unneeded fetchers for removal.
+ ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
const AString &uri = mFetcherInfos.keyAt(i);
- uint32_t resumeMask = 0;
-
sp<AnotherPacketSource> sources[kMaxStreams];
for (size_t j = 0; j < kMaxStreams; ++j) {
- if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
+ if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
sources[j] = mPacketSources.valueFor(indexToType(j));
- resumeMask |= indexToType(j);
}
}
- CHECK_NE(resumeMask, 0u);
-
- ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
-
- streamMask &= ~resumeMask;
-
- mFetcherInfos.valueAt(i).mFetcher->startAsync(
- sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
+ FetcherInfo &info = mFetcherInfos.editValueAt(i);
+ if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
+ || sources[kSubtitleIndex] != NULL) {
+ info.mFetcher->startAsync(
+ sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
+ } else {
+ info.mToBeRemoved = true;
+ }
}
// streamMask now only contains the types that need a new fetcher created.
@@ -1030,6 +1150,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
ALOGV("creating new fetchers for mask 0x%08x", streamMask);
}
+ // Find out when the original fetchers have buffered up to and start the new fetchers
+ // at a later timestamp.
for (size_t i = 0; i < kMaxStreams; i++) {
if (!(indexToType(i) & streamMask)) {
continue;
@@ -1041,12 +1163,40 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
CHECK(fetcher != NULL);
+ int32_t latestSeq = -1;
+ int64_t latestTimeUs = 0ll;
sp<AnotherPacketSource> sources[kMaxStreams];
+
// TRICKY: looping from i as earlier streams are already removed from streamMask
for (size_t j = i; j < kMaxStreams; ++j) {
if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
sources[j] = mPacketSources.valueFor(indexToType(j));
- sources[j]->clear();
+
+ if (!switching) {
+ sources[j]->clear();
+ } else {
+ int32_t type, seq;
+ int64_t srcTimeUs;
+ sp<AMessage> meta = sources[j]->getLatestMeta();
+
+ if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
+ CHECK(meta->findInt32("seq", &seq));
+ if (seq > latestSeq) {
+ latestSeq = seq;
+ }
+ CHECK(meta->findInt64("timeUs", &srcTimeUs));
+ if (srcTimeUs > latestTimeUs) {
+ latestTimeUs = srcTimeUs;
+ }
+ }
+
+ sources[j] = mPacketSources2.valueFor(indexToType(j));
+ sources[j]->clear();
+ uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
+ if (extraStreams & indexToType(j)) {
+ sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
+ }
+ }
streamMask &= ~indexToType(j);
}
@@ -1056,7 +1206,9 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
sources[kAudioIndex],
sources[kVideoIndex],
sources[kSubtitleIndex],
- timeUs);
+ timeUs,
+ latestTimeUs /* min start time(us) */,
+ latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
}
// All fetchers have now been started, the configuration change
@@ -1065,14 +1217,61 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
scheduleCheckBandwidthEvent();
ALOGV("XXX configuration change completed.");
-
mReconfigurationInProgress = false;
+ if (switching) {
+ mSwitchInProgress = true;
+ } else {
+ mStreamMask = mNewStreamMask;
+ }
if (mDisconnectReplyID != 0) {
finishDisconnect();
}
}
+void LiveSession::onSwapped(const sp<AMessage> &msg) {
+ int32_t switchGeneration;
+ CHECK(msg->findInt32("switchGeneration", &switchGeneration));
+ if (switchGeneration != mSwitchGeneration) {
+ return;
+ }
+
+ int32_t stream;
+ CHECK(msg->findInt32("stream", &stream));
+ mSwapMask |= stream;
+ if (mSwapMask != mStreamMask) {
+ return;
+ }
+
+ // Check if new variant contains extra streams.
+ uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
+ while (extraStreams) {
+ StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
+ swapPacketSource(extraStream);
+ extraStreams &= ~extraStream;
+ }
+
+ tryToFinishBandwidthSwitch();
+}
+
+// Mark switch done when:
+// 1. all old buffers are swapped out, AND
+// 2. all old fetchers are removed.
+void LiveSession::tryToFinishBandwidthSwitch() {
+ bool needToRemoveFetchers = false;
+ for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
+ if (mFetcherInfos.valueAt(i).mToBeRemoved) {
+ needToRemoveFetchers = true;
+ break;
+ }
+ }
+ if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
+ mStreamMask = mNewStreamMask;
+ mSwitchInProgress = false;
+ mSwapMask = 0;
+ }
+}
+
void LiveSession::scheduleCheckBandwidthEvent() {
sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
msg->setInt32("generation", mCheckBandwidthGeneration);
@@ -1083,16 +1282,37 @@ void LiveSession::cancelCheckBandwidthEvent() {
++mCheckBandwidthGeneration;
}
-void LiveSession::onCheckBandwidth() {
- if (mReconfigurationInProgress) {
- scheduleCheckBandwidthEvent();
- return;
+void LiveSession::cancelBandwidthSwitch() {
+ Mutex::Autolock lock(mSwapMutex);
+ mSwitchGeneration++;
+ mSwitchInProgress = false;
+ mSwapMask = 0;
+}
+
+bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
+ if (mReconfigurationInProgress || mSwitchInProgress) {
+ return false;
+ }
+
+ if (mPrevBandwidthIndex < 0) {
+ return true;
}
+ if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
+ return false;
+ } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
+ return canSwitchUp();
+ } else {
+ return true;
+ }
+}
+
+void LiveSession::onCheckBandwidth() {
size_t bandwidthIndex = getBandwidthIndex();
- if (mPrevBandwidthIndex < 0
- || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
+ if (canSwitchBandwidthTo(bandwidthIndex)) {
changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
+ } else {
+ scheduleCheckBandwidthEvent();
}
// Handling the kWhatCheckBandwidth even here does _not_ automatically
diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h
index c4d125c..91bbcea 100644
--- a/media/libstagefright/httplive/LiveSession.h
+++ b/media/libstagefright/httplive/LiveSession.h
@@ -83,6 +83,11 @@ struct LiveSession : public AHandler {
kWhatPreparationFailed,
};
+ // create a format-change discontinuity
+ //
+ // swap:
+ // whether is format-change discontinuity should trigger a buffer swap
+ sp<ABuffer> createFormatChangeBuffer(bool swap = true);
protected:
virtual ~LiveSession();
@@ -101,6 +106,7 @@ private:
kWhatChangeConfiguration2 = 'chC2',
kWhatChangeConfiguration3 = 'chC3',
kWhatFinishDisconnect2 = 'fin2',
+ kWhatSwapped = 'swap',
};
struct BandwidthItem {
@@ -112,6 +118,7 @@ private:
sp<PlaylistFetcher> mFetcher;
int64_t mDurationUs;
bool mIsPrepared;
+ bool mToBeRemoved;
};
struct StreamItem {
@@ -146,9 +153,26 @@ private:
KeyedVector<AString, FetcherInfo> mFetcherInfos;
uint32_t mStreamMask;
+ // Masks used during reconfiguration:
+ // mNewStreamMask: streams in the variant playlist we're switching to;
+ // we don't want to immediately overwrite the original value.
+ uint32_t mNewStreamMask;
+
+ // mSwapMask: streams that have started to playback content in the new variant playlist;
+ // we use this to track reconfiguration progress.
+ uint32_t mSwapMask;
+
KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources;
+ // A second set of packet sources that buffer content for the variant we're switching to.
+ KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2;
+
+ // A mutex used to serialize two sets of events:
+ // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND
+ // * a forced bandwidth switch termination in cancelSwitch on the live looper.
+ Mutex mSwapMutex;
int32_t mCheckBandwidthGeneration;
+ int32_t mSwitchGeneration;
size_t mContinuationCounter;
sp<AMessage> mContinuation;
@@ -157,6 +181,7 @@ private:
int64_t mRealTimeBaseUs;
bool mReconfigurationInProgress;
+ bool mSwitchInProgress;
uint32_t mDisconnectReplyID;
sp<PlaylistFetcher> addFetcher(const char *uri);
@@ -199,16 +224,27 @@ private:
void onChangeConfiguration(const sp<AMessage> &msg);
void onChangeConfiguration2(const sp<AMessage> &msg);
void onChangeConfiguration3(const sp<AMessage> &msg);
+ void onSwapped(const sp<AMessage> &msg);
+ void tryToFinishBandwidthSwitch();
void scheduleCheckBandwidthEvent();
void cancelCheckBandwidthEvent();
+ // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources
+ // from being swapped out on stale discontinuities while manipulating
+ // mPacketSources/mPacketSources2.
+ void cancelBandwidthSwitch();
+
+ bool canSwitchBandwidthTo(size_t bandwidthIndex);
void onCheckBandwidth();
void finishDisconnect();
void postPrepared(status_t err);
+ void swapPacketSource(StreamType stream);
+ bool canSwitchUp();
+
DISALLOW_EVIL_CONSTRUCTORS(LiveSession);
};
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp
index 030cbde..1a02e85 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.cpp
+++ b/media/libstagefright/httplive/PlaylistFetcher.cpp
@@ -48,16 +48,20 @@ namespace android {
// static
const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
+const int32_t PlaylistFetcher::kNumSkipFrames = 10;
PlaylistFetcher::PlaylistFetcher(
const sp<AMessage> &notify,
const sp<LiveSession> &session,
const char *uri)
: mNotify(notify),
+ mStartTimeUsNotify(notify->dup()),
mSession(session),
mURI(uri),
mStreamTypeMask(0),
mStartTimeUs(-1ll),
+ mMinStartTimeUs(0ll),
+ mStopParams(NULL),
mLastPlaylistFetchTimeUs(-1ll),
mSeqNumber(-1),
mNumRetries(0),
@@ -69,6 +73,8 @@ PlaylistFetcher::PlaylistFetcher(
mFirstPTSValid(false),
mAbsoluteTimeAnchorUs(0ll) {
memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
+ mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
+ mStartTimeUsNotify->setInt32("streamMask", 0);
}
PlaylistFetcher::~PlaylistFetcher() {
@@ -325,7 +331,9 @@ void PlaylistFetcher::startAsync(
const sp<AnotherPacketSource> &audioSource,
const sp<AnotherPacketSource> &videoSource,
const sp<AnotherPacketSource> &subtitleSource,
- int64_t startTimeUs) {
+ int64_t startTimeUs,
+ int64_t minStartTimeUs,
+ int32_t startSeqNumberHint) {
sp<AMessage> msg = new AMessage(kWhatStart, id());
uint32_t streamTypeMask = 0ul;
@@ -347,6 +355,8 @@ void PlaylistFetcher::startAsync(
msg->setInt32("streamTypeMask", streamTypeMask);
msg->setInt64("startTimeUs", startTimeUs);
+ msg->setInt64("minStartTimeUs", minStartTimeUs);
+ msg->setInt32("startSeqNumberHint", startSeqNumberHint);
msg->post();
}
@@ -358,6 +368,12 @@ void PlaylistFetcher::stopAsync() {
(new AMessage(kWhatStop, id()))->post();
}
+void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
+ AMessage* msg = new AMessage(kWhatResumeUntil, id());
+ msg->setMessage("params", params);
+ msg->post();
+}
+
void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatStart:
@@ -392,6 +408,7 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
}
case kWhatMonitorQueue:
+ case kWhatDownloadNext:
{
int32_t generation;
CHECK(msg->findInt32("generation", &generation));
@@ -401,7 +418,17 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
break;
}
- onMonitorQueue();
+ if (msg->what() == kWhatMonitorQueue) {
+ onMonitorQueue();
+ } else {
+ onDownloadNext();
+ }
+ break;
+ }
+
+ case kWhatResumeUntil:
+ {
+ onResumeUntil(msg);
break;
}
@@ -417,7 +444,10 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
int64_t startTimeUs;
+ int32_t startSeqNumberHint;
CHECK(msg->findInt64("startTimeUs", &startTimeUs));
+ CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs));
+ CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint));
if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
void *ptr;
@@ -455,6 +485,10 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
mPrepared = false;
}
+ if (startSeqNumberHint >= 0) {
+ mSeqNumber = startSeqNumberHint;
+ }
+
postMonitorQueue();
return OK;
@@ -462,20 +496,70 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
void PlaylistFetcher::onPause() {
cancelMonitorQueue();
+}
+
+void PlaylistFetcher::onStop() {
+ cancelMonitorQueue();
mPacketSources.clear();
mStreamTypeMask = 0;
}
-void PlaylistFetcher::onStop() {
- cancelMonitorQueue();
+// Resume until we have reached the boundary timestamps listed in `msg`; when
+// the remaining time is too short (within a resume threshold) stop immediately
+// instead.
+status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
+ sp<AMessage> params;
+ CHECK(msg->findMessage("params", &params));
+
+ bool stop = false;
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+
+ const char *stopKey;
+ int streamType = mPacketSources.keyAt(i);
+ switch (streamType) {
+ case LiveSession::STREAMTYPE_VIDEO:
+ stopKey = "timeUsVideo";
+ break;
- for (size_t i = 0; i < mPacketSources.size(); ++i) {
- mPacketSources.valueAt(i)->clear();
+ case LiveSession::STREAMTYPE_AUDIO:
+ stopKey = "timeUsAudio";
+ break;
+
+ case LiveSession::STREAMTYPE_SUBTITLES:
+ stopKey = "timeUsSubtitle";
+ break;
+
+ default:
+ TRESPASS();
+ }
+
+ // Don't resume if we would stop within a resume threshold.
+ int64_t latestTimeUs = 0, stopTimeUs = 0;
+ sp<AMessage> latestMeta = packetSource->getLatestMeta();
+ if (latestMeta != NULL
+ && (latestMeta->findInt64("timeUs", &latestTimeUs)
+ && params->findInt64(stopKey, &stopTimeUs))) {
+ int64_t diffUs = stopTimeUs - latestTimeUs;
+ if (diffUs < resumeThreshold(latestMeta)) {
+ stop = true;
+ }
+ }
}
- mPacketSources.clear();
- mStreamTypeMask = 0;
+ if (stop) {
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
+ }
+ stopAsync();
+ return OK;
+ }
+
+ mStopParams = params;
+ postMonitorQueue();
+
+ return OK;
}
void PlaylistFetcher::notifyError(status_t err) {
@@ -519,8 +603,9 @@ void PlaylistFetcher::onMonitorQueue() {
packetSource->getBufferedDurationUs(&finalResult);
finalResult = OK;
} else {
- bool first = true;
-
+ // Use max stream duration to prevent us from waiting on a non-existent stream;
+ // when we cannot make out from the manifest what streams are included in a playlist
+ // we might assume extra streams.
for (size_t i = 0; i < mPacketSources.size(); ++i) {
if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
continue;
@@ -528,9 +613,10 @@ void PlaylistFetcher::onMonitorQueue() {
int64_t bufferedStreamDurationUs =
mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
- if (first || bufferedStreamDurationUs < bufferedDurationUs) {
+ ALOGV("buffered %lld for stream %d",
+ bufferedStreamDurationUs, mPacketSources.keyAt(i));
+ if (bufferedStreamDurationUs > bufferedDurationUs) {
bufferedDurationUs = bufferedStreamDurationUs;
- first = false;
}
}
}
@@ -550,7 +636,12 @@ void PlaylistFetcher::onMonitorQueue() {
if (finalResult == OK && downloadMore) {
ALOGV("monitoring, buffered=%lld < %lld",
bufferedDurationUs, durationToBufferUs);
- onDownloadNext();
+ // delay the next download slightly; hopefully this gives other concurrent fetchers
+ // a better chance to run.
+ // onDownloadNext();
+ sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
+ msg->setInt32("generation", mMonitorQueueGeneration);
+ msg->post(1000l);
} else {
// Nothing to do yet, try again in a second.
@@ -617,6 +708,12 @@ void PlaylistFetcher::onDownloadNext() {
const int32_t lastSeqNumberInPlaylist =
firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
+ if (mStartup && mSeqNumber >= 0
+ && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
+ // in case we guessed wrong during reconfiguration, try fetching the latest content.
+ mSeqNumber = lastSeqNumberInPlaylist;
+ }
+
if (mSeqNumber < 0) {
CHECK_GE(mStartTimeUs, 0ll);
@@ -762,6 +859,18 @@ void PlaylistFetcher::onDownloadNext() {
err = extractAndQueueAccessUnits(buffer, itemMeta);
+ if (err == -EAGAIN) {
+ // bad starting sequence number hint
+ postMonitorQueue();
+ return;
+ }
+
+ if (err == ERROR_OUT_OF_RANGE) {
+ // reached stopping point
+ stopAsync();
+ return;
+ }
+
if (err != OK) {
notifyError(err);
return;
@@ -818,12 +927,15 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
}
if (mTSParser == NULL) {
- mTSParser = new ATSParser;
+ // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
+ mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
}
if (mNextPTSTimeUs >= 0ll) {
sp<AMessage> extra = new AMessage;
- extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs);
+ // Since we are using absolute timestamps, signal an offset of 0 to prevent
+ // ATSParser from skewing the timestamps of access units.
+ extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
mTSParser->signalDiscontinuity(
ATSParser::DISCONTINUITY_SEEK, extra);
@@ -842,17 +954,23 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
offset += 188;
}
+ status_t err = OK;
for (size_t i = mPacketSources.size(); i-- > 0;) {
sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+ const char *key;
ATSParser::SourceType type;
- switch (mPacketSources.keyAt(i)) {
+ const LiveSession::StreamType stream = mPacketSources.keyAt(i);
+ switch (stream) {
+
case LiveSession::STREAMTYPE_VIDEO:
type = ATSParser::VIDEO;
+ key = "timeUsVideo";
break;
case LiveSession::STREAMTYPE_AUDIO:
type = ATSParser::AUDIO;
+ key = "timeUsAudio";
break;
case LiveSession::STREAMTYPE_SUBTITLES:
@@ -879,19 +997,87 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
continue;
}
+ int64_t timeUs;
sp<ABuffer> accessUnit;
status_t finalResult;
while (source->hasBufferAvailable(&finalResult)
&& source->dequeueAccessUnit(&accessUnit) == OK) {
- // Note that we do NOT dequeue any discontinuities.
+
+ CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
+ if (mMinStartTimeUs > 0) {
+ if (timeUs < mMinStartTimeUs) {
+ // TODO untested path
+ // try a later ts
+ int32_t targetDuration;
+ mPlaylist->meta()->findInt32("target-duration", &targetDuration);
+ int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration;
+ if (incr == 0) {
+ // increment mSeqNumber by at least one
+ incr = 1;
+ }
+ mSeqNumber += incr;
+ err = -EAGAIN;
+ break;
+ } else {
+ int64_t startTimeUs;
+ if (mStartTimeUsNotify != NULL
+ && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
+ mStartTimeUsNotify->setInt64(key, timeUs);
+
+ uint32_t streamMask = 0;
+ mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
+ streamMask |= mPacketSources.keyAt(i);
+ mStartTimeUsNotify->setInt32("streamMask", streamMask);
+
+ if (streamMask == mStreamTypeMask) {
+ mStartTimeUsNotify->post();
+ mStartTimeUsNotify.clear();
+ }
+ }
+ }
+ }
+
+ if (mStopParams != NULL) {
+ // Queue discontinuity in original stream.
+ int64_t stopTimeUs;
+ if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) {
+ packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
+ mStreamTypeMask &= ~stream;
+ mPacketSources.removeItemsAt(i);
+ break;
+ }
+ }
+
+ // Note that we do NOT dequeue any discontinuities except for format change.
// for simplicity, store a reference to the format in each unit
sp<MetaData> format = source->getFormat();
if (format != NULL) {
accessUnit->meta()->setObject("format", format);
}
+
+ // Stash the sequence number so we can hint future fetchers where to start at.
+ accessUnit->meta()->setInt32("seq", mSeqNumber);
packetSource->queueAccessUnit(accessUnit);
}
+
+ if (err != OK) {
+ break;
+ }
+ }
+
+ if (err != OK) {
+ for (size_t i = mPacketSources.size(); i-- > 0;) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+ packetSource->clear();
+ }
+ return err;
+ }
+
+ if (!mStreamTypeMask) {
+ // Signal gap is filled between original and new stream.
+ ALOGV("ERROR OUT OF RANGE");
+ return ERROR_OUT_OF_RANGE;
}
return OK;
@@ -908,6 +1094,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
CHECK(itemMeta->findInt64("durationUs", &durationUs));
buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
buffer->meta()->setInt64("durationUs", durationUs);
+ buffer->meta()->setInt32("seq", mSeqNumber);
packetSource->queueAccessUnit(buffer);
return OK;
@@ -1044,6 +1231,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
// Each AAC frame encodes 1024 samples.
numSamples += 1024;
+ unit->meta()->setInt32("seq", mSeqNumber);
packetSource->queueAccessUnit(unit);
offset += aac_frame_length;
@@ -1071,4 +1259,33 @@ void PlaylistFetcher::updateDuration() {
msg->post();
}
+int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
+ int64_t durationUs, threshold;
+ if (msg->findInt64("durationUs", &durationUs)) {
+ return kNumSkipFrames * durationUs;
+ }
+
+ sp<RefBase> obj;
+ msg->findObject("format", &obj);
+ MetaData *format = static_cast<MetaData *>(obj.get());
+
+ const char *mime;
+ CHECK(format->findCString(kKeyMIMEType, &mime));
+ bool audio = !strncasecmp(mime, "audio/", 6);
+ if (audio) {
+ // Assumes 1000 samples per frame.
+ int32_t sampleRate;
+ CHECK(format->findInt32(kKeySampleRate, &sampleRate));
+ return kNumSkipFrames /* frames */ * 1000 /* samples */
+ * (1000000 / sampleRate) /* sample duration (us) */;
+ } else {
+ int32_t frameRate;
+ if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
+ return kNumSkipFrames * (1000000 / frameRate);
+ }
+ }
+
+ return 500000ll;
+}
+
} // namespace android
diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h
index ac04a77..2e0349f 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.h
+++ b/media/libstagefright/httplive/PlaylistFetcher.h
@@ -43,6 +43,7 @@ struct PlaylistFetcher : public AHandler {
kWhatTemporarilyDoneFetching,
kWhatPrepared,
kWhatPreparationFailed,
+ kWhatStartedAt,
};
PlaylistFetcher(
@@ -56,12 +57,16 @@ struct PlaylistFetcher : public AHandler {
const sp<AnotherPacketSource> &audioSource,
const sp<AnotherPacketSource> &videoSource,
const sp<AnotherPacketSource> &subtitleSource,
- int64_t startTimeUs = -1ll);
+ int64_t startTimeUs = -1ll,
+ int64_t minStartTimeUs = 0ll /* start after this timestamp */,
+ int32_t startSeqNumberHint = -1 /* try starting at this sequence number */);
void pauseAsync();
void stopAsync();
+ void resumeUntilAsync(const sp<AMessage> &params);
+
protected:
virtual ~PlaylistFetcher();
virtual void onMessageReceived(const sp<AMessage> &msg);
@@ -76,17 +81,25 @@ private:
kWhatPause = 'paus',
kWhatStop = 'stop',
kWhatMonitorQueue = 'moni',
+ kWhatResumeUntil = 'rsme',
+ kWhatDownloadNext = 'dlnx',
};
static const int64_t kMinBufferedDurationUs;
static const int64_t kMaxMonitorDelayUs;
+ static const int32_t kNumSkipFrames;
+ // notifications to mSession
sp<AMessage> mNotify;
+ sp<AMessage> mStartTimeUsNotify;
+
sp<LiveSession> mSession;
AString mURI;
uint32_t mStreamTypeMask;
int64_t mStartTimeUs;
+ int64_t mMinStartTimeUs; // start fetching no earlier than this value
+ sp<AMessage> mStopParams; // message containing the latest timestamps we should fetch.
KeyedVector<LiveSession::StreamType, sp<AnotherPacketSource> >
mPacketSources;
@@ -153,6 +166,9 @@ private:
void onMonitorQueue();
void onDownloadNext();
+ // Resume a fetcher to continue until the stopping point stored in msg.
+ status_t onResumeUntil(const sp<AMessage> &msg);
+
status_t extractAndQueueAccessUnits(
const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta);
@@ -165,6 +181,10 @@ private:
void updateDuration();
+ // Before resuming a fetcher in onResume, check the remaining duration is longer than that
+ // returned by resumeThreshold.
+ int64_t resumeThreshold(const sp<AMessage> &msg);
+
DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher);
};