diff options
Diffstat (limited to 'media/libstagefright/httplive/PlaylistFetcher.cpp')
-rw-r--r-- | media/libstagefright/httplive/PlaylistFetcher.cpp | 778 |
1 files changed, 603 insertions, 175 deletions
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp index 57bf7db..513f114 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.cpp +++ b/media/libstagefright/httplive/PlaylistFetcher.cpp @@ -48,26 +48,35 @@ namespace android { // static const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; +const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; +const int32_t PlaylistFetcher::kDownloadBlockSize = 192; +const int32_t PlaylistFetcher::kNumSkipFrames = 10; PlaylistFetcher::PlaylistFetcher( const sp<AMessage> ¬ify, const sp<LiveSession> &session, const char *uri) : mNotify(notify), + mStartTimeUsNotify(notify->dup()), mSession(session), mURI(uri), mStreamTypeMask(0), mStartTimeUs(-1ll), + mMinStartTimeUs(0ll), + mStopParams(NULL), mLastPlaylistFetchTimeUs(-1ll), mSeqNumber(-1), mNumRetries(0), mStartup(true), + mPrepared(false), mNextPTSTimeUs(-1ll), mMonitorQueueGeneration(0), mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), mFirstPTSValid(false), mAbsoluteTimeAnchorUs(0ll) { memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); + mStartTimeUsNotify->setInt32("what", kWhatStartedAt); + mStartTimeUsNotify->setInt32("streamMask", 0); } PlaylistFetcher::~PlaylistFetcher() { @@ -104,10 +113,16 @@ int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { return segmentStartUs; } -bool PlaylistFetcher::timeToRefreshPlaylist(int64_t nowUs) const { - if (mPlaylist == NULL) { +int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { + int64_t nowUs = ALooper::GetNowUs(); + + if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); - return true; + return 0ll; + } + + if (mPlaylist->isComplete()) { + return (~0llu >> 1); } int32_t targetDurationSecs; @@ -158,11 +173,13 @@ bool PlaylistFetcher::timeToRefreshPlaylist(int64_t nowUs) const { break; } - return mLastPlaylistFetchTimeUs + minPlaylistAgeUs <= nowUs; + int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; + return delayUs > 0ll ? delayUs : 0ll; } status_t PlaylistFetcher::decryptBuffer( - size_t playlistIndex, const sp<ABuffer> &buffer) { + size_t playlistIndex, const sp<ABuffer> &buffer, + bool first) { sp<AMessage> itemMeta; bool found = false; AString method; @@ -180,6 +197,7 @@ status_t PlaylistFetcher::decryptBuffer( if (!found) { method = "NONE"; } + buffer->meta()->setString("cipher-method", method.c_str()); if (method == "NONE") { return OK; @@ -200,9 +218,9 @@ status_t PlaylistFetcher::decryptBuffer( if (index >= 0) { key = mAESKeyForURI.valueAt(index); } else { - status_t err = mSession->fetchFile(keyURI.c_str(), &key); + ssize_t err = mSession->fetchFile(keyURI.c_str(), &key); - if (err != OK) { + if (err < 0) { ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); return ERROR_IO; } else if (key->size() != 16) { @@ -219,63 +237,89 @@ status_t PlaylistFetcher::decryptBuffer( return UNKNOWN_ERROR; } - unsigned char aes_ivec[16]; + size_t n = buffer->size(); + if (!n) { + return OK; + } + CHECK(n % 16 == 0); - AString iv; - if (itemMeta->findString("cipher-iv", &iv)) { - if ((!iv.startsWith("0x") && !iv.startsWith("0X")) - || iv.size() != 16 * 2 + 2) { - ALOGE("malformed cipher IV '%s'.", iv.c_str()); - return ERROR_MALFORMED; - } + if (first) { + // If decrypting the first block in a file, read the iv from the manifest + // or derive the iv from the file's sequence number. - memset(aes_ivec, 0, sizeof(aes_ivec)); - for (size_t i = 0; i < 16; ++i) { - char c1 = tolower(iv.c_str()[2 + 2 * i]); - char c2 = tolower(iv.c_str()[3 + 2 * i]); - if (!isxdigit(c1) || !isxdigit(c2)) { + AString iv; + if (itemMeta->findString("cipher-iv", &iv)) { + if ((!iv.startsWith("0x") && !iv.startsWith("0X")) + || iv.size() != 16 * 2 + 2) { ALOGE("malformed cipher IV '%s'.", iv.c_str()); return ERROR_MALFORMED; } - uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; - uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; - aes_ivec[i] = nibble1 << 4 | nibble2; + memset(mAESInitVec, 0, sizeof(mAESInitVec)); + for (size_t i = 0; i < 16; ++i) { + char c1 = tolower(iv.c_str()[2 + 2 * i]); + char c2 = tolower(iv.c_str()[3 + 2 * i]); + if (!isxdigit(c1) || !isxdigit(c2)) { + ALOGE("malformed cipher IV '%s'.", iv.c_str()); + return ERROR_MALFORMED; + } + uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; + uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; + + mAESInitVec[i] = nibble1 << 4 | nibble2; + } + } else { + memset(mAESInitVec, 0, sizeof(mAESInitVec)); + mAESInitVec[15] = mSeqNumber & 0xff; + mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; + mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; + mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; } - } else { - memset(aes_ivec, 0, sizeof(aes_ivec)); - aes_ivec[15] = mSeqNumber & 0xff; - aes_ivec[14] = (mSeqNumber >> 8) & 0xff; - aes_ivec[13] = (mSeqNumber >> 16) & 0xff; - aes_ivec[12] = (mSeqNumber >> 24) & 0xff; } AES_cbc_encrypt( buffer->data(), buffer->data(), buffer->size(), - &aes_key, aes_ivec, AES_DECRYPT); - - // hexdump(buffer->data(), buffer->size()); + &aes_key, mAESInitVec, AES_DECRYPT); - size_t n = buffer->size(); - CHECK_GT(n, 0u); + return OK; +} - size_t pad = buffer->data()[n - 1]; +status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { + status_t err; + AString method; + CHECK(buffer->meta()->findString("cipher-method", &method)); + if (method == "NONE") { + return OK; + } - CHECK_GT(pad, 0u); - CHECK_LE(pad, 16u); - CHECK_GE((size_t)n, pad); - for (size_t i = 0; i < pad; ++i) { - CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad); + uint8_t padding = 0; + if (buffer->size() > 0) { + padding = buffer->data()[buffer->size() - 1]; } - n -= pad; + if (padding > 16) { + return ERROR_MALFORMED; + } - buffer->setRange(buffer->offset(), n); + for (size_t i = buffer->size() - padding; i < padding; i++) { + if (buffer->data()[i] != padding) { + return ERROR_MALFORMED; + } + } + buffer->setRange(buffer->offset(), buffer->size() - padding); return OK; } -void PlaylistFetcher::postMonitorQueue(int64_t delayUs) { +void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { + int64_t maxDelayUs = delayUsToRefreshPlaylist(); + if (maxDelayUs < minDelayUs) { + maxDelayUs = minDelayUs; + } + if (delayUs > maxDelayUs) { + ALOGV("Need to refresh playlist in %lld", maxDelayUs); + delayUs = maxDelayUs; + } sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); msg->setInt32("generation", mMonitorQueueGeneration); msg->post(delayUs); @@ -289,7 +333,9 @@ void PlaylistFetcher::startAsync( const sp<AnotherPacketSource> &audioSource, const sp<AnotherPacketSource> &videoSource, const sp<AnotherPacketSource> &subtitleSource, - int64_t startTimeUs) { + int64_t startTimeUs, + int64_t minStartTimeUs, + int32_t startSeqNumberHint) { sp<AMessage> msg = new AMessage(kWhatStart, id()); uint32_t streamTypeMask = 0ul; @@ -311,6 +357,8 @@ void PlaylistFetcher::startAsync( msg->setInt32("streamTypeMask", streamTypeMask); msg->setInt64("startTimeUs", startTimeUs); + msg->setInt64("minStartTimeUs", minStartTimeUs); + msg->setInt32("startSeqNumberHint", startSeqNumberHint); msg->post(); } @@ -318,8 +366,16 @@ void PlaylistFetcher::pauseAsync() { (new AMessage(kWhatPause, id()))->post(); } -void PlaylistFetcher::stopAsync() { - (new AMessage(kWhatStop, id()))->post(); +void PlaylistFetcher::stopAsync(bool selfTriggered) { + sp<AMessage> msg = new AMessage(kWhatStop, id()); + msg->setInt32("selfTriggered", selfTriggered); + msg->post(); +} + +void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { + AMessage* msg = new AMessage(kWhatResumeUntil, id()); + msg->setMessage("params", params); + msg->post(); } void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { @@ -347,7 +403,7 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { case kWhatStop: { - onStop(); + onStop(msg); sp<AMessage> notify = mNotify->dup(); notify->setInt32("what", kWhatStopped); @@ -356,6 +412,7 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { } case kWhatMonitorQueue: + case kWhatDownloadNext: { int32_t generation; CHECK(msg->findInt32("generation", &generation)); @@ -365,7 +422,17 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { break; } - onMonitorQueue(); + if (msg->what() == kWhatMonitorQueue) { + onMonitorQueue(); + } else { + onDownloadNext(); + } + break; + } + + case kWhatResumeUntil: + { + onResumeUntil(msg); break; } @@ -381,7 +448,10 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); int64_t startTimeUs; + int32_t startSeqNumberHint; CHECK(msg->findInt64("startTimeUs", &startTimeUs)); + CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs)); + CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint)); if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { void *ptr; @@ -416,6 +486,11 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { if (mStartTimeUs >= 0ll) { mSeqNumber = -1; mStartup = true; + mPrepared = false; + } + + if (startSeqNumberHint >= 0) { + mSeqNumber = startSeqNumberHint; } postMonitorQueue(); @@ -425,22 +500,83 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { void PlaylistFetcher::onPause() { cancelMonitorQueue(); - - mPacketSources.clear(); - mStreamTypeMask = 0; } -void PlaylistFetcher::onStop() { +void PlaylistFetcher::onStop(const sp<AMessage> &msg) { cancelMonitorQueue(); - for (size_t i = 0; i < mPacketSources.size(); ++i) { - mPacketSources.valueAt(i)->clear(); + int32_t selfTriggered; + CHECK(msg->findInt32("selfTriggered", &selfTriggered)); + if (!selfTriggered) { + // Self triggered stops only happen during switching, in which case we do not want + // to clear the discontinuities queued at the end of packet sources. + for (size_t i = 0; i < mPacketSources.size(); i++) { + sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); + packetSource->clear(); + } } mPacketSources.clear(); mStreamTypeMask = 0; } +// Resume until we have reached the boundary timestamps listed in `msg`; when +// the remaining time is too short (within a resume threshold) stop immediately +// instead. +status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { + sp<AMessage> params; + CHECK(msg->findMessage("params", ¶ms)); + + bool stop = false; + for (size_t i = 0; i < mPacketSources.size(); i++) { + sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); + + const char *stopKey; + int streamType = mPacketSources.keyAt(i); + switch (streamType) { + case LiveSession::STREAMTYPE_VIDEO: + stopKey = "timeUsVideo"; + break; + + case LiveSession::STREAMTYPE_AUDIO: + stopKey = "timeUsAudio"; + break; + + case LiveSession::STREAMTYPE_SUBTITLES: + stopKey = "timeUsSubtitle"; + break; + + default: + TRESPASS(); + } + + // Don't resume if we would stop within a resume threshold. + int64_t latestTimeUs = 0, stopTimeUs = 0; + sp<AMessage> latestMeta = packetSource->getLatestMeta(); + if (latestMeta != NULL + && (latestMeta->findInt64("timeUs", &latestTimeUs) + && params->findInt64(stopKey, &stopTimeUs))) { + int64_t diffUs = stopTimeUs - latestTimeUs; + if (diffUs < resumeThreshold(latestMeta)) { + stop = true; + } + } + } + + if (stop) { + for (size_t i = 0; i < mPacketSources.size(); i++) { + mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); + } + stopAsync(/* selfTriggered = */ true); + return OK; + } + + mStopParams = params; + postMonitorQueue(); + + return OK; +} + void PlaylistFetcher::notifyError(status_t err) { sp<AMessage> notify = mNotify->dup(); notify->setInt32("what", kWhatError); @@ -457,41 +593,70 @@ void PlaylistFetcher::queueDiscontinuity( void PlaylistFetcher::onMonitorQueue() { bool downloadMore = false; + refreshPlaylist(); + + int32_t targetDurationSecs; + int64_t targetDurationUs = kMinBufferedDurationUs; + if (mPlaylist != NULL) { + CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); + targetDurationUs = targetDurationSecs * 1000000ll; + } - status_t finalResult; + // buffer at least 3 times the target duration, or up to 10 seconds + int64_t durationToBufferUs = targetDurationUs * 3; + if (durationToBufferUs > kMinBufferedDurationUs) { + durationToBufferUs = kMinBufferedDurationUs; + } + + int64_t bufferedDurationUs = 0ll; + status_t finalResult = NOT_ENOUGH_DATA; if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); - int64_t bufferedDurationUs = + bufferedDurationUs = packetSource->getBufferedDurationUs(&finalResult); - - downloadMore = (bufferedDurationUs < kMinBufferedDurationUs); finalResult = OK; } else { - bool first = true; - int64_t minBufferedDurationUs = 0ll; - + // Use max stream duration to prevent us from waiting on a non-existent stream; + // when we cannot make out from the manifest what streams are included in a playlist + // we might assume extra streams. for (size_t i = 0; i < mPacketSources.size(); ++i) { if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { continue; } - int64_t bufferedDurationUs = + int64_t bufferedStreamDurationUs = mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); - - if (first || bufferedDurationUs < minBufferedDurationUs) { - minBufferedDurationUs = bufferedDurationUs; - first = false; + ALOGV("buffered %lld for stream %d", + bufferedStreamDurationUs, mPacketSources.keyAt(i)); + if (bufferedStreamDurationUs > bufferedDurationUs) { + bufferedDurationUs = bufferedStreamDurationUs; } } + } + downloadMore = (bufferedDurationUs < durationToBufferUs); + + // signal start if buffered up at least the target size + if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { + mPrepared = true; - downloadMore = - !first && (minBufferedDurationUs < kMinBufferedDurationUs); + ALOGV("prepared, buffered=%lld > %lld", + bufferedDurationUs, targetDurationUs); + sp<AMessage> msg = mNotify->dup(); + msg->setInt32("what", kWhatTemporarilyDoneFetching); + msg->post(); } if (finalResult == OK && downloadMore) { - onDownloadNext(); + ALOGV("monitoring, buffered=%lld < %lld", + bufferedDurationUs, durationToBufferUs); + // delay the next download slightly; hopefully this gives other concurrent fetchers + // a better chance to run. + // onDownloadNext(); + sp<AMessage> msg = new AMessage(kWhatDownloadNext, id()); + msg->setInt32("generation", mMonitorQueueGeneration); + msg->post(1000l); } else { // Nothing to do yet, try again in a second. @@ -499,15 +664,17 @@ void PlaylistFetcher::onMonitorQueue() { msg->setInt32("what", kWhatTemporarilyDoneFetching); msg->post(); - postMonitorQueue(1000000ll); + int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; + ALOGV("pausing for %lld, buffered=%lld > %lld", + delayUs, bufferedDurationUs, durationToBufferUs); + // :TRICKY: need to enforce minimum delay because the delay to + // refresh the playlist will become 0 + postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0); } } -void PlaylistFetcher::onDownloadNext() { - int64_t nowUs = ALooper::GetNowUs(); - - if (mLastPlaylistFetchTimeUs < 0ll - || (!mPlaylist->isComplete() && timeToRefreshPlaylist(nowUs))) { +status_t PlaylistFetcher::refreshPlaylist() { + if (delayUsToRefreshPlaylist() <= 0) { bool unchanged; sp<M3UParser> playlist = mSession->fetchPlaylist( mURI.c_str(), mPlaylistHash, &unchanged); @@ -523,7 +690,7 @@ void PlaylistFetcher::onDownloadNext() { } else { ALOGE("failed to load playlist at url '%s'", mURI.c_str()); notifyError(ERROR_IO); - return; + return ERROR_IO; } } else { mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; @@ -536,6 +703,18 @@ void PlaylistFetcher::onDownloadNext() { mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); } + return OK; +} + +// static +bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) { + return buffer->size() > 0 && buffer->data()[0] == 0x47; +} + +void PlaylistFetcher::onDownloadNext() { + if (refreshPlaylist() != OK) { + return; + } int32_t firstSeqNumberInPlaylist; if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( @@ -549,17 +728,29 @@ void PlaylistFetcher::onDownloadNext() { const int32_t lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; + if (mStartup && mSeqNumber >= 0 + && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) { + // in case we guessed wrong during reconfiguration, try fetching the latest content. + mSeqNumber = lastSeqNumberInPlaylist; + } + if (mSeqNumber < 0) { CHECK_GE(mStartTimeUs, 0ll); if (mPlaylist->isComplete() || mPlaylist->isEvent()) { mSeqNumber = getSeqNumberForTime(mStartTimeUs); + ALOGV("Initial sequence number for time %lld is %ld from (%ld .. %ld)", + mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, + lastSeqNumberInPlaylist); } else { // If this is a live session, start 3 segments from the end. mSeqNumber = lastSeqNumberInPlaylist - 3; if (mSeqNumber < firstSeqNumberInPlaylist) { mSeqNumber = firstSeqNumberInPlaylist; } + ALOGV("Initial sequence number for live event %ld from (%ld .. %ld)", + mSeqNumber, firstSeqNumberInPlaylist, + lastSeqNumberInPlaylist); } mStartTimeUs = -1ll; @@ -571,16 +762,34 @@ void PlaylistFetcher::onDownloadNext() { ++mNumRetries; if (mSeqNumber > lastSeqNumberInPlaylist) { - mLastPlaylistFetchTimeUs = -1; - postMonitorQueue(3000000ll); + // refresh in increasing fraction (1/2, 1/3, ...) of the + // playlist's target duration or 3 seconds, whichever is less + int32_t targetDurationSecs; + CHECK(mPlaylist->meta()->findInt32( + "target-duration", &targetDurationSecs)); + int64_t delayUs = mPlaylist->size() * targetDurationSecs * + 1000000ll / (1 + mNumRetries); + if (delayUs > kMaxMonitorDelayUs) { + delayUs = kMaxMonitorDelayUs; + } + ALOGV("sequence number high: %ld from (%ld .. %ld), monitor in %lld (retry=%d)", + mSeqNumber, firstSeqNumberInPlaylist, + lastSeqNumberInPlaylist, delayUs, mNumRetries); + postMonitorQueue(delayUs); return; } // we've missed the boat, let's start from the lowest sequence // number available and signal a discontinuity. - ALOGI("We've missed the boat, restarting playback."); - mSeqNumber = lastSeqNumberInPlaylist; + ALOGI("We've missed the boat, restarting playback." + " mStartup=%d, was looking for %d in %d-%d", + mStartup, mSeqNumber, firstSeqNumberInPlaylist, + lastSeqNumberInPlaylist); + mSeqNumber = lastSeqNumberInPlaylist - 3; + if (mSeqNumber < firstSeqNumberInPlaylist) { + mSeqNumber = firstSeqNumberInPlaylist; + } explicitDiscontinuity = true; // fall through @@ -621,50 +830,160 @@ void PlaylistFetcher::onDownloadNext() { ALOGV("fetching '%s'", uri.c_str()); - sp<ABuffer> buffer; - status_t err = mSession->fetchFile( - uri.c_str(), &buffer, range_offset, range_length); - - if (err != OK) { - ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); - notifyError(err); - return; + sp<DataSource> source; + sp<ABuffer> buffer, tsBuffer; + // decrypt a junk buffer to prefetch key; since a session uses only one http connection, + // this avoids interleaved connections to the key and segment file. + { + sp<ABuffer> junk = new ABuffer(16); + junk->setRange(0, 16); + status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, + true /* first */); + if (err != OK) { + notifyError(err); + return; + } } - CHECK(buffer != NULL); + // block-wise download + ssize_t bytesRead; + do { + bytesRead = mSession->fetchFile( + uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source); + + if (bytesRead < 0) { + status_t err = bytesRead; + ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); + notifyError(err); + return; + } - err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer); + CHECK(buffer != NULL); - if (err != OK) { - ALOGE("decryptBuffer failed w/ error %d", err); + size_t size = buffer->size(); + // Set decryption range. + buffer->setRange(size - bytesRead, bytesRead); + status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, + buffer->offset() == 0 /* first */); + // Unset decryption range. + buffer->setRange(0, size); - notifyError(err); - return; - } + if (err != OK) { + ALOGE("decryptBuffer failed w/ error %d", err); - if (mStartup || seekDiscontinuity || explicitDiscontinuity) { - // Signal discontinuity. + notifyError(err); + return; + } - if (mPlaylist->isComplete() || mPlaylist->isEvent()) { - // If this was a live event this made no sense since - // we don't have access to all the segment before the current - // one. - mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); + if (mStartup || seekDiscontinuity || explicitDiscontinuity) { + // Signal discontinuity. + + if (mPlaylist->isComplete() || mPlaylist->isEvent()) { + // If this was a live event this made no sense since + // we don't have access to all the segment before the current + // one. + mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); + } + + if (seekDiscontinuity || explicitDiscontinuity) { + ALOGI("queueing discontinuity (seek=%d, explicit=%d)", + seekDiscontinuity, explicitDiscontinuity); + + queueDiscontinuity( + explicitDiscontinuity + ? ATSParser::DISCONTINUITY_FORMATCHANGE + : ATSParser::DISCONTINUITY_SEEK, + NULL /* extra */); + } + } + + err = OK; + if (bufferStartsWithTsSyncByte(buffer)) { + // Incremental extraction is only supported for MPEG2 transport streams. + if (tsBuffer == NULL) { + tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); + tsBuffer->setRange(0, 0); + } else if (tsBuffer->capacity() != buffer->capacity()) { + size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); + tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); + tsBuffer->setRange(tsOff, tsSize); + } + tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); + + err = extractAndQueueAccessUnitsFromTs(tsBuffer); + } + + if (err == -EAGAIN) { + // bad starting sequence number hint + postMonitorQueue(); + return; } - if (seekDiscontinuity || explicitDiscontinuity) { - ALOGI("queueing discontinuity (seek=%d, explicit=%d)", - seekDiscontinuity, explicitDiscontinuity); + if (err == ERROR_OUT_OF_RANGE) { + // reached stopping point + stopAsync(/* selfTriggered = */ true); + return; + } - queueDiscontinuity( - explicitDiscontinuity - ? ATSParser::DISCONTINUITY_FORMATCHANGE - : ATSParser::DISCONTINUITY_SEEK, - NULL /* extra */); + if (err != OK) { + notifyError(err); + return; } + + mStartup = false; + } while (bytesRead != 0); + + if (bufferStartsWithTsSyncByte(buffer)) { + // If we still don't see a stream after fetching a full ts segment mark it as + // nonexistent. + const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES; + ATSParser::SourceType srcTypes[kNumTypes] = + { ATSParser::VIDEO, ATSParser::AUDIO }; + LiveSession::StreamType streamTypes[kNumTypes] = + { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; + + for (size_t i = 0; i < kNumTypes; i++) { + ATSParser::SourceType srcType = srcTypes[i]; + LiveSession::StreamType streamType = streamTypes[i]; + + sp<AnotherPacketSource> source = + static_cast<AnotherPacketSource *>( + mTSParser->getSource(srcType).get()); + + if (source == NULL) { + ALOGW("MPEG2 Transport stream does not contain %s data.", + srcType == ATSParser::VIDEO ? "video" : "audio"); + + mStreamTypeMask &= ~streamType; + mPacketSources.removeItem(streamType); + } + } + + } + + if (checkDecryptPadding(buffer) != OK) { + ALOGE("Incorrect padding bytes after decryption."); + notifyError(ERROR_MALFORMED); + return; } - err = extractAndQueueAccessUnits(buffer, itemMeta); + status_t err = OK; + if (tsBuffer != NULL) { + AString method; + CHECK(buffer->meta()->findString("cipher-method", &method)); + if ((tsBuffer->size() > 0 && method == "NONE") + || tsBuffer->size() > 16) { + ALOGE("MPEG2 transport stream is not an even multiple of 188 " + "bytes in length."); + notifyError(ERROR_MALFORMED); + return; + } + } + + // bulk extract non-ts files + if (tsBuffer == NULL) { + err = extractAndQueueAccessUnits(buffer, itemMeta); + } if (err != OK) { notifyError(err); @@ -674,8 +993,6 @@ void PlaylistFetcher::onDownloadNext() { ++mSeqNumber; postMonitorQueue(); - - mStartup = false; } int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { @@ -710,95 +1027,163 @@ int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { return firstSeqNumberInPlaylist + index; } -status_t PlaylistFetcher::extractAndQueueAccessUnits( - const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { - if (buffer->size() > 0 && buffer->data()[0] == 0x47) { - // Let's assume this is an MPEG2 transport stream. +status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { + if (mTSParser == NULL) { + // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. + mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); + } - if ((buffer->size() % 188) != 0) { - ALOGE("MPEG2 transport stream is not an even multiple of 188 " - "bytes in length."); - return ERROR_MALFORMED; - } + if (mNextPTSTimeUs >= 0ll) { + sp<AMessage> extra = new AMessage; + // Since we are using absolute timestamps, signal an offset of 0 to prevent + // ATSParser from skewing the timestamps of access units. + extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); - if (mTSParser == NULL) { - mTSParser = new ATSParser; - } + mTSParser->signalDiscontinuity( + ATSParser::DISCONTINUITY_SEEK, extra); - if (mNextPTSTimeUs >= 0ll) { - sp<AMessage> extra = new AMessage; - extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs); + mNextPTSTimeUs = -1ll; + } - mTSParser->signalDiscontinuity( - ATSParser::DISCONTINUITY_SEEK, extra); + size_t offset = 0; + while (offset + 188 <= buffer->size()) { + status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); - mNextPTSTimeUs = -1ll; + if (err != OK) { + return err; } - size_t offset = 0; - while (offset < buffer->size()) { - status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); + offset += 188; + } + // setRange to indicate consumed bytes. + buffer->setRange(buffer->offset() + offset, buffer->size() - offset); + + status_t err = OK; + for (size_t i = mPacketSources.size(); i-- > 0;) { + sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); + + const char *key; + ATSParser::SourceType type; + const LiveSession::StreamType stream = mPacketSources.keyAt(i); + switch (stream) { + case LiveSession::STREAMTYPE_VIDEO: + type = ATSParser::VIDEO; + key = "timeUsVideo"; + break; + + case LiveSession::STREAMTYPE_AUDIO: + type = ATSParser::AUDIO; + key = "timeUsAudio"; + break; - if (err != OK) { - return err; + case LiveSession::STREAMTYPE_SUBTITLES: + { + ALOGE("MPEG2 Transport streams do not contain subtitles."); + return ERROR_MALFORMED; + break; } - offset += 188; + default: + TRESPASS(); } - for (size_t i = mPacketSources.size(); i-- > 0;) { - sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); + sp<AnotherPacketSource> source = + static_cast<AnotherPacketSource *>( + mTSParser->getSource(type).get()); - ATSParser::SourceType type; - switch (mPacketSources.keyAt(i)) { - case LiveSession::STREAMTYPE_VIDEO: - type = ATSParser::VIDEO; - break; + if (source == NULL) { + continue; + } - case LiveSession::STREAMTYPE_AUDIO: - type = ATSParser::AUDIO; + int64_t timeUs; + sp<ABuffer> accessUnit; + status_t finalResult; + while (source->hasBufferAvailable(&finalResult) + && source->dequeueAccessUnit(&accessUnit) == OK) { + + CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); + if (mMinStartTimeUs > 0) { + if (timeUs < mMinStartTimeUs) { + // TODO untested path + // try a later ts + int32_t targetDuration; + mPlaylist->meta()->findInt32("target-duration", &targetDuration); + int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration; + if (incr == 0) { + // increment mSeqNumber by at least one + incr = 1; + } + mSeqNumber += incr; + err = -EAGAIN; break; + } else { + int64_t startTimeUs; + if (mStartTimeUsNotify != NULL + && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) { + mStartTimeUsNotify->setInt64(key, timeUs); + + uint32_t streamMask = 0; + mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); + streamMask |= mPacketSources.keyAt(i); + mStartTimeUsNotify->setInt32("streamMask", streamMask); + + if (streamMask == mStreamTypeMask) { + mStartTimeUsNotify->post(); + mStartTimeUsNotify.clear(); + } + } + } + } - case LiveSession::STREAMTYPE_SUBTITLES: - { - ALOGE("MPEG2 Transport streams do not contain subtitles."); - return ERROR_MALFORMED; + if (mStopParams != NULL) { + // Queue discontinuity in original stream. + int64_t stopTimeUs; + if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) { + packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); + mStreamTypeMask &= ~stream; + mPacketSources.removeItemsAt(i); break; } - - default: - TRESPASS(); } - sp<AnotherPacketSource> source = - static_cast<AnotherPacketSource *>( - mTSParser->getSource(type).get()); - - if (source == NULL) { - ALOGW("MPEG2 Transport stream does not contain %s data.", - type == ATSParser::VIDEO ? "video" : "audio"); + // Note that we do NOT dequeue any discontinuities except for format change. - mStreamTypeMask &= ~mPacketSources.keyAt(i); - mPacketSources.removeItemsAt(i); - continue; + // for simplicity, store a reference to the format in each unit + sp<MetaData> format = source->getFormat(); + if (format != NULL) { + accessUnit->meta()->setObject("format", format); } - sp<ABuffer> accessUnit; - status_t finalResult; - while (source->hasBufferAvailable(&finalResult) - && source->dequeueAccessUnit(&accessUnit) == OK) { - // Note that we do NOT dequeue any discontinuities. + // Stash the sequence number so we can hint future playlist where to start at. + accessUnit->meta()->setInt32("seq", mSeqNumber); + packetSource->queueAccessUnit(accessUnit); + } - packetSource->queueAccessUnit(accessUnit); - } + if (err != OK) { + break; + } + } - if (packetSource->getFormat() == NULL) { - packetSource->setFormat(source->getFormat()); - } + if (err != OK) { + for (size_t i = mPacketSources.size(); i-- > 0;) { + sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); + packetSource->clear(); } + return err; + } - return OK; - } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) { + if (!mStreamTypeMask) { + // Signal gap is filled between original and new stream. + ALOGV("ERROR OUT OF RANGE"); + return ERROR_OUT_OF_RANGE; + } + + return OK; +} + +status_t PlaylistFetcher::extractAndQueueAccessUnits( + const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { + if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) { if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { ALOGE("This stream only contains subtitles."); return ERROR_MALFORMED; @@ -811,6 +1196,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( CHECK(itemMeta->findInt64("durationUs", &durationUs)); buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); buffer->meta()->setInt64("durationUs", durationUs); + buffer->meta()->setInt32("seq", mSeqNumber); packetSource->queueAccessUnit(buffer); return OK; @@ -936,6 +1322,18 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( | (adtsHeader[4] << 3) | (adtsHeader[5] >> 5); + if (aac_frame_length == 0) { + const uint8_t *id3Header = adtsHeader; + if (!memcmp(id3Header, "ID3", 3)) { + ID3 id3(id3Header, buffer->size() - offset, true); + if (id3.isValid()) { + offset += id3.rawSize(); + continue; + }; + } + return ERROR_MALFORMED; + } + CHECK_LE(offset + aac_frame_length, buffer->size()); sp<ABuffer> unit = new ABuffer(aac_frame_length); @@ -947,6 +1345,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( // Each AAC frame encodes 1024 samples. numSamples += 1024; + unit->meta()->setInt32("seq", mSeqNumber); packetSource->queueAccessUnit(unit); offset += aac_frame_length; @@ -974,4 +1373,33 @@ void PlaylistFetcher::updateDuration() { msg->post(); } +int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) { + int64_t durationUs, threshold; + if (msg->findInt64("durationUs", &durationUs)) { + return kNumSkipFrames * durationUs; + } + + sp<RefBase> obj; + msg->findObject("format", &obj); + MetaData *format = static_cast<MetaData *>(obj.get()); + + const char *mime; + CHECK(format->findCString(kKeyMIMEType, &mime)); + bool audio = !strncasecmp(mime, "audio/", 6); + if (audio) { + // Assumes 1000 samples per frame. + int32_t sampleRate; + CHECK(format->findInt32(kKeySampleRate, &sampleRate)); + return kNumSkipFrames /* frames */ * 1000 /* samples */ + * (1000000 / sampleRate) /* sample duration (us) */; + } else { + int32_t frameRate; + if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) { + return kNumSkipFrames * (1000000 / frameRate); + } + } + + return 500000ll; +} + } // namespace android |