summaryrefslogtreecommitdiffstats
path: root/media/libstagefright/httplive/PlaylistFetcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'media/libstagefright/httplive/PlaylistFetcher.cpp')
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.cpp778
1 files changed, 603 insertions, 175 deletions
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp
index 57bf7db..513f114 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.cpp
+++ b/media/libstagefright/httplive/PlaylistFetcher.cpp
@@ -48,26 +48,35 @@ namespace android {
// static
const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
+const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
+const int32_t PlaylistFetcher::kDownloadBlockSize = 192;
+const int32_t PlaylistFetcher::kNumSkipFrames = 10;
PlaylistFetcher::PlaylistFetcher(
const sp<AMessage> &notify,
const sp<LiveSession> &session,
const char *uri)
: mNotify(notify),
+ mStartTimeUsNotify(notify->dup()),
mSession(session),
mURI(uri),
mStreamTypeMask(0),
mStartTimeUs(-1ll),
+ mMinStartTimeUs(0ll),
+ mStopParams(NULL),
mLastPlaylistFetchTimeUs(-1ll),
mSeqNumber(-1),
mNumRetries(0),
mStartup(true),
+ mPrepared(false),
mNextPTSTimeUs(-1ll),
mMonitorQueueGeneration(0),
mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
mFirstPTSValid(false),
mAbsoluteTimeAnchorUs(0ll) {
memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
+ mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
+ mStartTimeUsNotify->setInt32("streamMask", 0);
}
PlaylistFetcher::~PlaylistFetcher() {
@@ -104,10 +113,16 @@ int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
return segmentStartUs;
}
-bool PlaylistFetcher::timeToRefreshPlaylist(int64_t nowUs) const {
- if (mPlaylist == NULL) {
+int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
+ int64_t nowUs = ALooper::GetNowUs();
+
+ if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
- return true;
+ return 0ll;
+ }
+
+ if (mPlaylist->isComplete()) {
+ return (~0llu >> 1);
}
int32_t targetDurationSecs;
@@ -158,11 +173,13 @@ bool PlaylistFetcher::timeToRefreshPlaylist(int64_t nowUs) const {
break;
}
- return mLastPlaylistFetchTimeUs + minPlaylistAgeUs <= nowUs;
+ int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
+ return delayUs > 0ll ? delayUs : 0ll;
}
status_t PlaylistFetcher::decryptBuffer(
- size_t playlistIndex, const sp<ABuffer> &buffer) {
+ size_t playlistIndex, const sp<ABuffer> &buffer,
+ bool first) {
sp<AMessage> itemMeta;
bool found = false;
AString method;
@@ -180,6 +197,7 @@ status_t PlaylistFetcher::decryptBuffer(
if (!found) {
method = "NONE";
}
+ buffer->meta()->setString("cipher-method", method.c_str());
if (method == "NONE") {
return OK;
@@ -200,9 +218,9 @@ status_t PlaylistFetcher::decryptBuffer(
if (index >= 0) {
key = mAESKeyForURI.valueAt(index);
} else {
- status_t err = mSession->fetchFile(keyURI.c_str(), &key);
+ ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
- if (err != OK) {
+ if (err < 0) {
ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
return ERROR_IO;
} else if (key->size() != 16) {
@@ -219,63 +237,89 @@ status_t PlaylistFetcher::decryptBuffer(
return UNKNOWN_ERROR;
}
- unsigned char aes_ivec[16];
+ size_t n = buffer->size();
+ if (!n) {
+ return OK;
+ }
+ CHECK(n % 16 == 0);
- AString iv;
- if (itemMeta->findString("cipher-iv", &iv)) {
- if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
- || iv.size() != 16 * 2 + 2) {
- ALOGE("malformed cipher IV '%s'.", iv.c_str());
- return ERROR_MALFORMED;
- }
+ if (first) {
+ // If decrypting the first block in a file, read the iv from the manifest
+ // or derive the iv from the file's sequence number.
- memset(aes_ivec, 0, sizeof(aes_ivec));
- for (size_t i = 0; i < 16; ++i) {
- char c1 = tolower(iv.c_str()[2 + 2 * i]);
- char c2 = tolower(iv.c_str()[3 + 2 * i]);
- if (!isxdigit(c1) || !isxdigit(c2)) {
+ AString iv;
+ if (itemMeta->findString("cipher-iv", &iv)) {
+ if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
+ || iv.size() != 16 * 2 + 2) {
ALOGE("malformed cipher IV '%s'.", iv.c_str());
return ERROR_MALFORMED;
}
- uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
- uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
- aes_ivec[i] = nibble1 << 4 | nibble2;
+ memset(mAESInitVec, 0, sizeof(mAESInitVec));
+ for (size_t i = 0; i < 16; ++i) {
+ char c1 = tolower(iv.c_str()[2 + 2 * i]);
+ char c2 = tolower(iv.c_str()[3 + 2 * i]);
+ if (!isxdigit(c1) || !isxdigit(c2)) {
+ ALOGE("malformed cipher IV '%s'.", iv.c_str());
+ return ERROR_MALFORMED;
+ }
+ uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
+ uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
+
+ mAESInitVec[i] = nibble1 << 4 | nibble2;
+ }
+ } else {
+ memset(mAESInitVec, 0, sizeof(mAESInitVec));
+ mAESInitVec[15] = mSeqNumber & 0xff;
+ mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
+ mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
+ mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
}
- } else {
- memset(aes_ivec, 0, sizeof(aes_ivec));
- aes_ivec[15] = mSeqNumber & 0xff;
- aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
- aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
- aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
}
AES_cbc_encrypt(
buffer->data(), buffer->data(), buffer->size(),
- &aes_key, aes_ivec, AES_DECRYPT);
-
- // hexdump(buffer->data(), buffer->size());
+ &aes_key, mAESInitVec, AES_DECRYPT);
- size_t n = buffer->size();
- CHECK_GT(n, 0u);
+ return OK;
+}
- size_t pad = buffer->data()[n - 1];
+status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
+ status_t err;
+ AString method;
+ CHECK(buffer->meta()->findString("cipher-method", &method));
+ if (method == "NONE") {
+ return OK;
+ }
- CHECK_GT(pad, 0u);
- CHECK_LE(pad, 16u);
- CHECK_GE((size_t)n, pad);
- for (size_t i = 0; i < pad; ++i) {
- CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
+ uint8_t padding = 0;
+ if (buffer->size() > 0) {
+ padding = buffer->data()[buffer->size() - 1];
}
- n -= pad;
+ if (padding > 16) {
+ return ERROR_MALFORMED;
+ }
- buffer->setRange(buffer->offset(), n);
+ for (size_t i = buffer->size() - padding; i < padding; i++) {
+ if (buffer->data()[i] != padding) {
+ return ERROR_MALFORMED;
+ }
+ }
+ buffer->setRange(buffer->offset(), buffer->size() - padding);
return OK;
}
-void PlaylistFetcher::postMonitorQueue(int64_t delayUs) {
+void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
+ int64_t maxDelayUs = delayUsToRefreshPlaylist();
+ if (maxDelayUs < minDelayUs) {
+ maxDelayUs = minDelayUs;
+ }
+ if (delayUs > maxDelayUs) {
+ ALOGV("Need to refresh playlist in %lld", maxDelayUs);
+ delayUs = maxDelayUs;
+ }
sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
msg->setInt32("generation", mMonitorQueueGeneration);
msg->post(delayUs);
@@ -289,7 +333,9 @@ void PlaylistFetcher::startAsync(
const sp<AnotherPacketSource> &audioSource,
const sp<AnotherPacketSource> &videoSource,
const sp<AnotherPacketSource> &subtitleSource,
- int64_t startTimeUs) {
+ int64_t startTimeUs,
+ int64_t minStartTimeUs,
+ int32_t startSeqNumberHint) {
sp<AMessage> msg = new AMessage(kWhatStart, id());
uint32_t streamTypeMask = 0ul;
@@ -311,6 +357,8 @@ void PlaylistFetcher::startAsync(
msg->setInt32("streamTypeMask", streamTypeMask);
msg->setInt64("startTimeUs", startTimeUs);
+ msg->setInt64("minStartTimeUs", minStartTimeUs);
+ msg->setInt32("startSeqNumberHint", startSeqNumberHint);
msg->post();
}
@@ -318,8 +366,16 @@ void PlaylistFetcher::pauseAsync() {
(new AMessage(kWhatPause, id()))->post();
}
-void PlaylistFetcher::stopAsync() {
- (new AMessage(kWhatStop, id()))->post();
+void PlaylistFetcher::stopAsync(bool selfTriggered) {
+ sp<AMessage> msg = new AMessage(kWhatStop, id());
+ msg->setInt32("selfTriggered", selfTriggered);
+ msg->post();
+}
+
+void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
+ AMessage* msg = new AMessage(kWhatResumeUntil, id());
+ msg->setMessage("params", params);
+ msg->post();
}
void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
@@ -347,7 +403,7 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
case kWhatStop:
{
- onStop();
+ onStop(msg);
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatStopped);
@@ -356,6 +412,7 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
}
case kWhatMonitorQueue:
+ case kWhatDownloadNext:
{
int32_t generation;
CHECK(msg->findInt32("generation", &generation));
@@ -365,7 +422,17 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
break;
}
- onMonitorQueue();
+ if (msg->what() == kWhatMonitorQueue) {
+ onMonitorQueue();
+ } else {
+ onDownloadNext();
+ }
+ break;
+ }
+
+ case kWhatResumeUntil:
+ {
+ onResumeUntil(msg);
break;
}
@@ -381,7 +448,10 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
int64_t startTimeUs;
+ int32_t startSeqNumberHint;
CHECK(msg->findInt64("startTimeUs", &startTimeUs));
+ CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs));
+ CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint));
if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
void *ptr;
@@ -416,6 +486,11 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
if (mStartTimeUs >= 0ll) {
mSeqNumber = -1;
mStartup = true;
+ mPrepared = false;
+ }
+
+ if (startSeqNumberHint >= 0) {
+ mSeqNumber = startSeqNumberHint;
}
postMonitorQueue();
@@ -425,22 +500,83 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
void PlaylistFetcher::onPause() {
cancelMonitorQueue();
-
- mPacketSources.clear();
- mStreamTypeMask = 0;
}
-void PlaylistFetcher::onStop() {
+void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
cancelMonitorQueue();
- for (size_t i = 0; i < mPacketSources.size(); ++i) {
- mPacketSources.valueAt(i)->clear();
+ int32_t selfTriggered;
+ CHECK(msg->findInt32("selfTriggered", &selfTriggered));
+ if (!selfTriggered) {
+ // Self triggered stops only happen during switching, in which case we do not want
+ // to clear the discontinuities queued at the end of packet sources.
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+ packetSource->clear();
+ }
}
mPacketSources.clear();
mStreamTypeMask = 0;
}
+// Resume until we have reached the boundary timestamps listed in `msg`; when
+// the remaining time is too short (within a resume threshold) stop immediately
+// instead.
+status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
+ sp<AMessage> params;
+ CHECK(msg->findMessage("params", &params));
+
+ bool stop = false;
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+
+ const char *stopKey;
+ int streamType = mPacketSources.keyAt(i);
+ switch (streamType) {
+ case LiveSession::STREAMTYPE_VIDEO:
+ stopKey = "timeUsVideo";
+ break;
+
+ case LiveSession::STREAMTYPE_AUDIO:
+ stopKey = "timeUsAudio";
+ break;
+
+ case LiveSession::STREAMTYPE_SUBTITLES:
+ stopKey = "timeUsSubtitle";
+ break;
+
+ default:
+ TRESPASS();
+ }
+
+ // Don't resume if we would stop within a resume threshold.
+ int64_t latestTimeUs = 0, stopTimeUs = 0;
+ sp<AMessage> latestMeta = packetSource->getLatestMeta();
+ if (latestMeta != NULL
+ && (latestMeta->findInt64("timeUs", &latestTimeUs)
+ && params->findInt64(stopKey, &stopTimeUs))) {
+ int64_t diffUs = stopTimeUs - latestTimeUs;
+ if (diffUs < resumeThreshold(latestMeta)) {
+ stop = true;
+ }
+ }
+ }
+
+ if (stop) {
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
+ }
+ stopAsync(/* selfTriggered = */ true);
+ return OK;
+ }
+
+ mStopParams = params;
+ postMonitorQueue();
+
+ return OK;
+}
+
void PlaylistFetcher::notifyError(status_t err) {
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatError);
@@ -457,41 +593,70 @@ void PlaylistFetcher::queueDiscontinuity(
void PlaylistFetcher::onMonitorQueue() {
bool downloadMore = false;
+ refreshPlaylist();
+
+ int32_t targetDurationSecs;
+ int64_t targetDurationUs = kMinBufferedDurationUs;
+ if (mPlaylist != NULL) {
+ CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
+ targetDurationUs = targetDurationSecs * 1000000ll;
+ }
- status_t finalResult;
+ // buffer at least 3 times the target duration, or up to 10 seconds
+ int64_t durationToBufferUs = targetDurationUs * 3;
+ if (durationToBufferUs > kMinBufferedDurationUs) {
+ durationToBufferUs = kMinBufferedDurationUs;
+ }
+
+ int64_t bufferedDurationUs = 0ll;
+ status_t finalResult = NOT_ENOUGH_DATA;
if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
sp<AnotherPacketSource> packetSource =
mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
- int64_t bufferedDurationUs =
+ bufferedDurationUs =
packetSource->getBufferedDurationUs(&finalResult);
-
- downloadMore = (bufferedDurationUs < kMinBufferedDurationUs);
finalResult = OK;
} else {
- bool first = true;
- int64_t minBufferedDurationUs = 0ll;
-
+ // Use max stream duration to prevent us from waiting on a non-existent stream;
+ // when we cannot make out from the manifest what streams are included in a playlist
+ // we might assume extra streams.
for (size_t i = 0; i < mPacketSources.size(); ++i) {
if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
continue;
}
- int64_t bufferedDurationUs =
+ int64_t bufferedStreamDurationUs =
mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
-
- if (first || bufferedDurationUs < minBufferedDurationUs) {
- minBufferedDurationUs = bufferedDurationUs;
- first = false;
+ ALOGV("buffered %lld for stream %d",
+ bufferedStreamDurationUs, mPacketSources.keyAt(i));
+ if (bufferedStreamDurationUs > bufferedDurationUs) {
+ bufferedDurationUs = bufferedStreamDurationUs;
}
}
+ }
+ downloadMore = (bufferedDurationUs < durationToBufferUs);
+
+ // signal start if buffered up at least the target size
+ if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
+ mPrepared = true;
- downloadMore =
- !first && (minBufferedDurationUs < kMinBufferedDurationUs);
+ ALOGV("prepared, buffered=%lld > %lld",
+ bufferedDurationUs, targetDurationUs);
+ sp<AMessage> msg = mNotify->dup();
+ msg->setInt32("what", kWhatTemporarilyDoneFetching);
+ msg->post();
}
if (finalResult == OK && downloadMore) {
- onDownloadNext();
+ ALOGV("monitoring, buffered=%lld < %lld",
+ bufferedDurationUs, durationToBufferUs);
+ // delay the next download slightly; hopefully this gives other concurrent fetchers
+ // a better chance to run.
+ // onDownloadNext();
+ sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
+ msg->setInt32("generation", mMonitorQueueGeneration);
+ msg->post(1000l);
} else {
// Nothing to do yet, try again in a second.
@@ -499,15 +664,17 @@ void PlaylistFetcher::onMonitorQueue() {
msg->setInt32("what", kWhatTemporarilyDoneFetching);
msg->post();
- postMonitorQueue(1000000ll);
+ int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
+ ALOGV("pausing for %lld, buffered=%lld > %lld",
+ delayUs, bufferedDurationUs, durationToBufferUs);
+ // :TRICKY: need to enforce minimum delay because the delay to
+ // refresh the playlist will become 0
+ postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
}
}
-void PlaylistFetcher::onDownloadNext() {
- int64_t nowUs = ALooper::GetNowUs();
-
- if (mLastPlaylistFetchTimeUs < 0ll
- || (!mPlaylist->isComplete() && timeToRefreshPlaylist(nowUs))) {
+status_t PlaylistFetcher::refreshPlaylist() {
+ if (delayUsToRefreshPlaylist() <= 0) {
bool unchanged;
sp<M3UParser> playlist = mSession->fetchPlaylist(
mURI.c_str(), mPlaylistHash, &unchanged);
@@ -523,7 +690,7 @@ void PlaylistFetcher::onDownloadNext() {
} else {
ALOGE("failed to load playlist at url '%s'", mURI.c_str());
notifyError(ERROR_IO);
- return;
+ return ERROR_IO;
}
} else {
mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
@@ -536,6 +703,18 @@ void PlaylistFetcher::onDownloadNext() {
mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
}
+ return OK;
+}
+
+// static
+bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
+ return buffer->size() > 0 && buffer->data()[0] == 0x47;
+}
+
+void PlaylistFetcher::onDownloadNext() {
+ if (refreshPlaylist() != OK) {
+ return;
+ }
int32_t firstSeqNumberInPlaylist;
if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
@@ -549,17 +728,29 @@ void PlaylistFetcher::onDownloadNext() {
const int32_t lastSeqNumberInPlaylist =
firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
+ if (mStartup && mSeqNumber >= 0
+ && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
+ // in case we guessed wrong during reconfiguration, try fetching the latest content.
+ mSeqNumber = lastSeqNumberInPlaylist;
+ }
+
if (mSeqNumber < 0) {
CHECK_GE(mStartTimeUs, 0ll);
if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
mSeqNumber = getSeqNumberForTime(mStartTimeUs);
+ ALOGV("Initial sequence number for time %lld is %ld from (%ld .. %ld)",
+ mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
} else {
// If this is a live session, start 3 segments from the end.
mSeqNumber = lastSeqNumberInPlaylist - 3;
if (mSeqNumber < firstSeqNumberInPlaylist) {
mSeqNumber = firstSeqNumberInPlaylist;
}
+ ALOGV("Initial sequence number for live event %ld from (%ld .. %ld)",
+ mSeqNumber, firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
}
mStartTimeUs = -1ll;
@@ -571,16 +762,34 @@ void PlaylistFetcher::onDownloadNext() {
++mNumRetries;
if (mSeqNumber > lastSeqNumberInPlaylist) {
- mLastPlaylistFetchTimeUs = -1;
- postMonitorQueue(3000000ll);
+ // refresh in increasing fraction (1/2, 1/3, ...) of the
+ // playlist's target duration or 3 seconds, whichever is less
+ int32_t targetDurationSecs;
+ CHECK(mPlaylist->meta()->findInt32(
+ "target-duration", &targetDurationSecs));
+ int64_t delayUs = mPlaylist->size() * targetDurationSecs *
+ 1000000ll / (1 + mNumRetries);
+ if (delayUs > kMaxMonitorDelayUs) {
+ delayUs = kMaxMonitorDelayUs;
+ }
+ ALOGV("sequence number high: %ld from (%ld .. %ld), monitor in %lld (retry=%d)",
+ mSeqNumber, firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist, delayUs, mNumRetries);
+ postMonitorQueue(delayUs);
return;
}
// we've missed the boat, let's start from the lowest sequence
// number available and signal a discontinuity.
- ALOGI("We've missed the boat, restarting playback.");
- mSeqNumber = lastSeqNumberInPlaylist;
+ ALOGI("We've missed the boat, restarting playback."
+ " mStartup=%d, was looking for %d in %d-%d",
+ mStartup, mSeqNumber, firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
+ mSeqNumber = lastSeqNumberInPlaylist - 3;
+ if (mSeqNumber < firstSeqNumberInPlaylist) {
+ mSeqNumber = firstSeqNumberInPlaylist;
+ }
explicitDiscontinuity = true;
// fall through
@@ -621,50 +830,160 @@ void PlaylistFetcher::onDownloadNext() {
ALOGV("fetching '%s'", uri.c_str());
- sp<ABuffer> buffer;
- status_t err = mSession->fetchFile(
- uri.c_str(), &buffer, range_offset, range_length);
-
- if (err != OK) {
- ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
- notifyError(err);
- return;
+ sp<DataSource> source;
+ sp<ABuffer> buffer, tsBuffer;
+ // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
+ // this avoids interleaved connections to the key and segment file.
+ {
+ sp<ABuffer> junk = new ABuffer(16);
+ junk->setRange(0, 16);
+ status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
+ true /* first */);
+ if (err != OK) {
+ notifyError(err);
+ return;
+ }
}
- CHECK(buffer != NULL);
+ // block-wise download
+ ssize_t bytesRead;
+ do {
+ bytesRead = mSession->fetchFile(
+ uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
+
+ if (bytesRead < 0) {
+ status_t err = bytesRead;
+ ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
+ notifyError(err);
+ return;
+ }
- err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
+ CHECK(buffer != NULL);
- if (err != OK) {
- ALOGE("decryptBuffer failed w/ error %d", err);
+ size_t size = buffer->size();
+ // Set decryption range.
+ buffer->setRange(size - bytesRead, bytesRead);
+ status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
+ buffer->offset() == 0 /* first */);
+ // Unset decryption range.
+ buffer->setRange(0, size);
- notifyError(err);
- return;
- }
+ if (err != OK) {
+ ALOGE("decryptBuffer failed w/ error %d", err);
- if (mStartup || seekDiscontinuity || explicitDiscontinuity) {
- // Signal discontinuity.
+ notifyError(err);
+ return;
+ }
- if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
- // If this was a live event this made no sense since
- // we don't have access to all the segment before the current
- // one.
- mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
+ if (mStartup || seekDiscontinuity || explicitDiscontinuity) {
+ // Signal discontinuity.
+
+ if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
+ // If this was a live event this made no sense since
+ // we don't have access to all the segment before the current
+ // one.
+ mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
+ }
+
+ if (seekDiscontinuity || explicitDiscontinuity) {
+ ALOGI("queueing discontinuity (seek=%d, explicit=%d)",
+ seekDiscontinuity, explicitDiscontinuity);
+
+ queueDiscontinuity(
+ explicitDiscontinuity
+ ? ATSParser::DISCONTINUITY_FORMATCHANGE
+ : ATSParser::DISCONTINUITY_SEEK,
+ NULL /* extra */);
+ }
+ }
+
+ err = OK;
+ if (bufferStartsWithTsSyncByte(buffer)) {
+ // Incremental extraction is only supported for MPEG2 transport streams.
+ if (tsBuffer == NULL) {
+ tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
+ tsBuffer->setRange(0, 0);
+ } else if (tsBuffer->capacity() != buffer->capacity()) {
+ size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
+ tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
+ tsBuffer->setRange(tsOff, tsSize);
+ }
+ tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
+
+ err = extractAndQueueAccessUnitsFromTs(tsBuffer);
+ }
+
+ if (err == -EAGAIN) {
+ // bad starting sequence number hint
+ postMonitorQueue();
+ return;
}
- if (seekDiscontinuity || explicitDiscontinuity) {
- ALOGI("queueing discontinuity (seek=%d, explicit=%d)",
- seekDiscontinuity, explicitDiscontinuity);
+ if (err == ERROR_OUT_OF_RANGE) {
+ // reached stopping point
+ stopAsync(/* selfTriggered = */ true);
+ return;
+ }
- queueDiscontinuity(
- explicitDiscontinuity
- ? ATSParser::DISCONTINUITY_FORMATCHANGE
- : ATSParser::DISCONTINUITY_SEEK,
- NULL /* extra */);
+ if (err != OK) {
+ notifyError(err);
+ return;
}
+
+ mStartup = false;
+ } while (bytesRead != 0);
+
+ if (bufferStartsWithTsSyncByte(buffer)) {
+ // If we still don't see a stream after fetching a full ts segment mark it as
+ // nonexistent.
+ const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
+ ATSParser::SourceType srcTypes[kNumTypes] =
+ { ATSParser::VIDEO, ATSParser::AUDIO };
+ LiveSession::StreamType streamTypes[kNumTypes] =
+ { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
+
+ for (size_t i = 0; i < kNumTypes; i++) {
+ ATSParser::SourceType srcType = srcTypes[i];
+ LiveSession::StreamType streamType = streamTypes[i];
+
+ sp<AnotherPacketSource> source =
+ static_cast<AnotherPacketSource *>(
+ mTSParser->getSource(srcType).get());
+
+ if (source == NULL) {
+ ALOGW("MPEG2 Transport stream does not contain %s data.",
+ srcType == ATSParser::VIDEO ? "video" : "audio");
+
+ mStreamTypeMask &= ~streamType;
+ mPacketSources.removeItem(streamType);
+ }
+ }
+
+ }
+
+ if (checkDecryptPadding(buffer) != OK) {
+ ALOGE("Incorrect padding bytes after decryption.");
+ notifyError(ERROR_MALFORMED);
+ return;
}
- err = extractAndQueueAccessUnits(buffer, itemMeta);
+ status_t err = OK;
+ if (tsBuffer != NULL) {
+ AString method;
+ CHECK(buffer->meta()->findString("cipher-method", &method));
+ if ((tsBuffer->size() > 0 && method == "NONE")
+ || tsBuffer->size() > 16) {
+ ALOGE("MPEG2 transport stream is not an even multiple of 188 "
+ "bytes in length.");
+ notifyError(ERROR_MALFORMED);
+ return;
+ }
+ }
+
+ // bulk extract non-ts files
+ if (tsBuffer == NULL) {
+ err = extractAndQueueAccessUnits(buffer, itemMeta);
+ }
if (err != OK) {
notifyError(err);
@@ -674,8 +993,6 @@ void PlaylistFetcher::onDownloadNext() {
++mSeqNumber;
postMonitorQueue();
-
- mStartup = false;
}
int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
@@ -710,95 +1027,163 @@ int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
return firstSeqNumberInPlaylist + index;
}
-status_t PlaylistFetcher::extractAndQueueAccessUnits(
- const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
- if (buffer->size() > 0 && buffer->data()[0] == 0x47) {
- // Let's assume this is an MPEG2 transport stream.
+status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
+ if (mTSParser == NULL) {
+ // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
+ mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
+ }
- if ((buffer->size() % 188) != 0) {
- ALOGE("MPEG2 transport stream is not an even multiple of 188 "
- "bytes in length.");
- return ERROR_MALFORMED;
- }
+ if (mNextPTSTimeUs >= 0ll) {
+ sp<AMessage> extra = new AMessage;
+ // Since we are using absolute timestamps, signal an offset of 0 to prevent
+ // ATSParser from skewing the timestamps of access units.
+ extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
- if (mTSParser == NULL) {
- mTSParser = new ATSParser;
- }
+ mTSParser->signalDiscontinuity(
+ ATSParser::DISCONTINUITY_SEEK, extra);
- if (mNextPTSTimeUs >= 0ll) {
- sp<AMessage> extra = new AMessage;
- extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs);
+ mNextPTSTimeUs = -1ll;
+ }
- mTSParser->signalDiscontinuity(
- ATSParser::DISCONTINUITY_SEEK, extra);
+ size_t offset = 0;
+ while (offset + 188 <= buffer->size()) {
+ status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
- mNextPTSTimeUs = -1ll;
+ if (err != OK) {
+ return err;
}
- size_t offset = 0;
- while (offset < buffer->size()) {
- status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
+ offset += 188;
+ }
+ // setRange to indicate consumed bytes.
+ buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
+
+ status_t err = OK;
+ for (size_t i = mPacketSources.size(); i-- > 0;) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+
+ const char *key;
+ ATSParser::SourceType type;
+ const LiveSession::StreamType stream = mPacketSources.keyAt(i);
+ switch (stream) {
+ case LiveSession::STREAMTYPE_VIDEO:
+ type = ATSParser::VIDEO;
+ key = "timeUsVideo";
+ break;
+
+ case LiveSession::STREAMTYPE_AUDIO:
+ type = ATSParser::AUDIO;
+ key = "timeUsAudio";
+ break;
- if (err != OK) {
- return err;
+ case LiveSession::STREAMTYPE_SUBTITLES:
+ {
+ ALOGE("MPEG2 Transport streams do not contain subtitles.");
+ return ERROR_MALFORMED;
+ break;
}
- offset += 188;
+ default:
+ TRESPASS();
}
- for (size_t i = mPacketSources.size(); i-- > 0;) {
- sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+ sp<AnotherPacketSource> source =
+ static_cast<AnotherPacketSource *>(
+ mTSParser->getSource(type).get());
- ATSParser::SourceType type;
- switch (mPacketSources.keyAt(i)) {
- case LiveSession::STREAMTYPE_VIDEO:
- type = ATSParser::VIDEO;
- break;
+ if (source == NULL) {
+ continue;
+ }
- case LiveSession::STREAMTYPE_AUDIO:
- type = ATSParser::AUDIO;
+ int64_t timeUs;
+ sp<ABuffer> accessUnit;
+ status_t finalResult;
+ while (source->hasBufferAvailable(&finalResult)
+ && source->dequeueAccessUnit(&accessUnit) == OK) {
+
+ CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
+ if (mMinStartTimeUs > 0) {
+ if (timeUs < mMinStartTimeUs) {
+ // TODO untested path
+ // try a later ts
+ int32_t targetDuration;
+ mPlaylist->meta()->findInt32("target-duration", &targetDuration);
+ int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration;
+ if (incr == 0) {
+ // increment mSeqNumber by at least one
+ incr = 1;
+ }
+ mSeqNumber += incr;
+ err = -EAGAIN;
break;
+ } else {
+ int64_t startTimeUs;
+ if (mStartTimeUsNotify != NULL
+ && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
+ mStartTimeUsNotify->setInt64(key, timeUs);
+
+ uint32_t streamMask = 0;
+ mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
+ streamMask |= mPacketSources.keyAt(i);
+ mStartTimeUsNotify->setInt32("streamMask", streamMask);
+
+ if (streamMask == mStreamTypeMask) {
+ mStartTimeUsNotify->post();
+ mStartTimeUsNotify.clear();
+ }
+ }
+ }
+ }
- case LiveSession::STREAMTYPE_SUBTITLES:
- {
- ALOGE("MPEG2 Transport streams do not contain subtitles.");
- return ERROR_MALFORMED;
+ if (mStopParams != NULL) {
+ // Queue discontinuity in original stream.
+ int64_t stopTimeUs;
+ if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) {
+ packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
+ mStreamTypeMask &= ~stream;
+ mPacketSources.removeItemsAt(i);
break;
}
-
- default:
- TRESPASS();
}
- sp<AnotherPacketSource> source =
- static_cast<AnotherPacketSource *>(
- mTSParser->getSource(type).get());
-
- if (source == NULL) {
- ALOGW("MPEG2 Transport stream does not contain %s data.",
- type == ATSParser::VIDEO ? "video" : "audio");
+ // Note that we do NOT dequeue any discontinuities except for format change.
- mStreamTypeMask &= ~mPacketSources.keyAt(i);
- mPacketSources.removeItemsAt(i);
- continue;
+ // for simplicity, store a reference to the format in each unit
+ sp<MetaData> format = source->getFormat();
+ if (format != NULL) {
+ accessUnit->meta()->setObject("format", format);
}
- sp<ABuffer> accessUnit;
- status_t finalResult;
- while (source->hasBufferAvailable(&finalResult)
- && source->dequeueAccessUnit(&accessUnit) == OK) {
- // Note that we do NOT dequeue any discontinuities.
+ // Stash the sequence number so we can hint future playlist where to start at.
+ accessUnit->meta()->setInt32("seq", mSeqNumber);
+ packetSource->queueAccessUnit(accessUnit);
+ }
- packetSource->queueAccessUnit(accessUnit);
- }
+ if (err != OK) {
+ break;
+ }
+ }
- if (packetSource->getFormat() == NULL) {
- packetSource->setFormat(source->getFormat());
- }
+ if (err != OK) {
+ for (size_t i = mPacketSources.size(); i-- > 0;) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+ packetSource->clear();
}
+ return err;
+ }
- return OK;
- } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
+ if (!mStreamTypeMask) {
+ // Signal gap is filled between original and new stream.
+ ALOGV("ERROR OUT OF RANGE");
+ return ERROR_OUT_OF_RANGE;
+ }
+
+ return OK;
+}
+
+status_t PlaylistFetcher::extractAndQueueAccessUnits(
+ const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
+ if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
ALOGE("This stream only contains subtitles.");
return ERROR_MALFORMED;
@@ -811,6 +1196,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
CHECK(itemMeta->findInt64("durationUs", &durationUs));
buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
buffer->meta()->setInt64("durationUs", durationUs);
+ buffer->meta()->setInt32("seq", mSeqNumber);
packetSource->queueAccessUnit(buffer);
return OK;
@@ -936,6 +1322,18 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
| (adtsHeader[4] << 3)
| (adtsHeader[5] >> 5);
+ if (aac_frame_length == 0) {
+ const uint8_t *id3Header = adtsHeader;
+ if (!memcmp(id3Header, "ID3", 3)) {
+ ID3 id3(id3Header, buffer->size() - offset, true);
+ if (id3.isValid()) {
+ offset += id3.rawSize();
+ continue;
+ };
+ }
+ return ERROR_MALFORMED;
+ }
+
CHECK_LE(offset + aac_frame_length, buffer->size());
sp<ABuffer> unit = new ABuffer(aac_frame_length);
@@ -947,6 +1345,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
// Each AAC frame encodes 1024 samples.
numSamples += 1024;
+ unit->meta()->setInt32("seq", mSeqNumber);
packetSource->queueAccessUnit(unit);
offset += aac_frame_length;
@@ -974,4 +1373,33 @@ void PlaylistFetcher::updateDuration() {
msg->post();
}
+int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
+ int64_t durationUs, threshold;
+ if (msg->findInt64("durationUs", &durationUs)) {
+ return kNumSkipFrames * durationUs;
+ }
+
+ sp<RefBase> obj;
+ msg->findObject("format", &obj);
+ MetaData *format = static_cast<MetaData *>(obj.get());
+
+ const char *mime;
+ CHECK(format->findCString(kKeyMIMEType, &mime));
+ bool audio = !strncasecmp(mime, "audio/", 6);
+ if (audio) {
+ // Assumes 1000 samples per frame.
+ int32_t sampleRate;
+ CHECK(format->findInt32(kKeySampleRate, &sampleRate));
+ return kNumSkipFrames /* frames */ * 1000 /* samples */
+ * (1000000 / sampleRate) /* sample duration (us) */;
+ } else {
+ int32_t frameRate;
+ if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
+ return kNumSkipFrames * (1000000 / frameRate);
+ }
+ }
+
+ return 500000ll;
+}
+
} // namespace android