diff options
Diffstat (limited to 'media/libstagefright/httplive/PlaylistFetcher.cpp')
-rw-r--r-- | media/libstagefright/httplive/PlaylistFetcher.cpp | 514 |
1 files changed, 388 insertions, 126 deletions
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp index 513f114..30fa868 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.cpp +++ b/media/libstagefright/httplive/PlaylistFetcher.cpp @@ -49,31 +49,36 @@ namespace android { // static const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; -const int32_t PlaylistFetcher::kDownloadBlockSize = 192; +const int32_t PlaylistFetcher::kDownloadBlockSize = 2048; const int32_t PlaylistFetcher::kNumSkipFrames = 10; PlaylistFetcher::PlaylistFetcher( const sp<AMessage> ¬ify, const sp<LiveSession> &session, - const char *uri) + const char *uri, + int32_t subtitleGeneration) : mNotify(notify), mStartTimeUsNotify(notify->dup()), mSession(session), mURI(uri), mStreamTypeMask(0), mStartTimeUs(-1ll), - mMinStartTimeUs(0ll), - mStopParams(NULL), + mSegmentStartTimeUs(-1ll), + mDiscontinuitySeq(-1ll), + mStartTimeUsRelative(false), mLastPlaylistFetchTimeUs(-1ll), mSeqNumber(-1), mNumRetries(0), mStartup(true), + mAdaptive(false), mPrepared(false), mNextPTSTimeUs(-1ll), mMonitorQueueGeneration(0), + mSubtitleGeneration(subtitleGeneration), mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), mFirstPTSValid(false), - mAbsoluteTimeAnchorUs(0ll) { + mAbsoluteTimeAnchorUs(0ll), + mVideoBuffer(new AnotherPacketSource(NULL)) { memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); mStartTimeUsNotify->setInt32("what", kWhatStartedAt); mStartTimeUsNotify->setInt32("streamMask", 0); @@ -317,7 +322,7 @@ void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { maxDelayUs = minDelayUs; } if (delayUs > maxDelayUs) { - ALOGV("Need to refresh playlist in %lld", maxDelayUs); + ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs); delayUs = maxDelayUs; } sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); @@ -334,8 +339,9 @@ void PlaylistFetcher::startAsync( const sp<AnotherPacketSource> &videoSource, const sp<AnotherPacketSource> &subtitleSource, int64_t startTimeUs, - int64_t minStartTimeUs, - int32_t startSeqNumberHint) { + int64_t segmentStartTimeUs, + int32_t startDiscontinuitySeq, + bool adaptive) { sp<AMessage> msg = new AMessage(kWhatStart, id()); uint32_t streamTypeMask = 0ul; @@ -357,8 +363,9 @@ void PlaylistFetcher::startAsync( msg->setInt32("streamTypeMask", streamTypeMask); msg->setInt64("startTimeUs", startTimeUs); - msg->setInt64("minStartTimeUs", minStartTimeUs); - msg->setInt32("startSeqNumberHint", startSeqNumberHint); + msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); + msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); + msg->setInt32("adaptive", adaptive); msg->post(); } @@ -366,9 +373,9 @@ void PlaylistFetcher::pauseAsync() { (new AMessage(kWhatPause, id()))->post(); } -void PlaylistFetcher::stopAsync(bool selfTriggered) { +void PlaylistFetcher::stopAsync(bool clear) { sp<AMessage> msg = new AMessage(kWhatStop, id()); - msg->setInt32("selfTriggered", selfTriggered); + msg->setInt32("clear", clear); msg->post(); } @@ -448,10 +455,13 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); int64_t startTimeUs; - int32_t startSeqNumberHint; + int64_t segmentStartTimeUs; + int32_t startDiscontinuitySeq; + int32_t adaptive; CHECK(msg->findInt64("startTimeUs", &startTimeUs)); - CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs)); - CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint)); + CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); + CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); + CHECK(msg->findInt32("adaptive", &adaptive)); if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { void *ptr; @@ -481,16 +491,16 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { } mStreamTypeMask = streamTypeMask; - mStartTimeUs = startTimeUs; - if (mStartTimeUs >= 0ll) { + mSegmentStartTimeUs = segmentStartTimeUs; + mDiscontinuitySeq = startDiscontinuitySeq; + + if (startTimeUs >= 0) { + mStartTimeUs = startTimeUs; mSeqNumber = -1; mStartup = true; mPrepared = false; - } - - if (startSeqNumberHint >= 0) { - mSeqNumber = startSeqNumberHint; + mAdaptive = adaptive; } postMonitorQueue(); @@ -505,11 +515,9 @@ void PlaylistFetcher::onPause() { void PlaylistFetcher::onStop(const sp<AMessage> &msg) { cancelMonitorQueue(); - int32_t selfTriggered; - CHECK(msg->findInt32("selfTriggered", &selfTriggered)); - if (!selfTriggered) { - // Self triggered stops only happen during switching, in which case we do not want - // to clear the discontinuities queued at the end of packet sources. + int32_t clear; + CHECK(msg->findInt32("clear", &clear)); + if (clear) { for (size_t i = 0; i < mPacketSources.size(); i++) { sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); packetSource->clear(); @@ -551,15 +559,16 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { } // Don't resume if we would stop within a resume threshold. + int32_t discontinuitySeq; int64_t latestTimeUs = 0, stopTimeUs = 0; - sp<AMessage> latestMeta = packetSource->getLatestMeta(); + sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta(); if (latestMeta != NULL - && (latestMeta->findInt64("timeUs", &latestTimeUs) - && params->findInt64(stopKey, &stopTimeUs))) { - int64_t diffUs = stopTimeUs - latestTimeUs; - if (diffUs < resumeThreshold(latestMeta)) { - stop = true; - } + && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq) + && discontinuitySeq == mDiscontinuitySeq + && latestMeta->findInt64("timeUs", &latestTimeUs) + && params->findInt64(stopKey, &stopTimeUs) + && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) { + stop = true; } } @@ -567,7 +576,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { for (size_t i = 0; i < mPacketSources.size(); i++) { mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); } - stopAsync(/* selfTriggered = */ true); + stopAsync(/* clear = */ false); return OK; } @@ -587,7 +596,10 @@ void PlaylistFetcher::notifyError(status_t err) { void PlaylistFetcher::queueDiscontinuity( ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { for (size_t i = 0; i < mPacketSources.size(); ++i) { - mPacketSources.valueAt(i)->queueDiscontinuity(type, extra); + // do not discard buffer upon #EXT-X-DISCONTINUITY tag + // (seek will discard buffer by abandoning old fetchers) + mPacketSources.valueAt(i)->queueDiscontinuity( + type, extra, false /* discard */); } } @@ -628,7 +640,7 @@ void PlaylistFetcher::onMonitorQueue() { int64_t bufferedStreamDurationUs = mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); - ALOGV("buffered %lld for stream %d", + ALOGV("buffered %" PRId64 " for stream %d", bufferedStreamDurationUs, mPacketSources.keyAt(i)); if (bufferedStreamDurationUs > bufferedDurationUs) { bufferedDurationUs = bufferedStreamDurationUs; @@ -641,7 +653,7 @@ void PlaylistFetcher::onMonitorQueue() { if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { mPrepared = true; - ALOGV("prepared, buffered=%lld > %lld", + ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "", bufferedDurationUs, targetDurationUs); sp<AMessage> msg = mNotify->dup(); msg->setInt32("what", kWhatTemporarilyDoneFetching); @@ -649,7 +661,7 @@ void PlaylistFetcher::onMonitorQueue() { } if (finalResult == OK && downloadMore) { - ALOGV("monitoring, buffered=%lld < %lld", + ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "", bufferedDurationUs, durationToBufferUs); // delay the next download slightly; hopefully this gives other concurrent fetchers // a better chance to run. @@ -665,7 +677,7 @@ void PlaylistFetcher::onMonitorQueue() { msg->post(); int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; - ALOGV("pausing for %lld, buffered=%lld > %lld", + ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "", delayUs, bufferedDurationUs, durationToBufferUs); // :TRICKY: need to enforce minimum delay because the delay to // refresh the playlist will become 0 @@ -722,38 +734,55 @@ void PlaylistFetcher::onDownloadNext() { firstSeqNumberInPlaylist = 0; } - bool seekDiscontinuity = false; - bool explicitDiscontinuity = false; + bool discontinuity = false; const int32_t lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; - if (mStartup && mSeqNumber >= 0 - && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) { - // in case we guessed wrong during reconfiguration, try fetching the latest content. - mSeqNumber = lastSeqNumberInPlaylist; + if (mDiscontinuitySeq < 0) { + mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); } if (mSeqNumber < 0) { CHECK_GE(mStartTimeUs, 0ll); - if (mPlaylist->isComplete() || mPlaylist->isEvent()) { - mSeqNumber = getSeqNumberForTime(mStartTimeUs); - ALOGV("Initial sequence number for time %lld is %ld from (%ld .. %ld)", + if (mSegmentStartTimeUs < 0) { + if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { + // If this is a live session, start 3 segments from the end on connect + mSeqNumber = lastSeqNumberInPlaylist - 3; + if (mSeqNumber < firstSeqNumberInPlaylist) { + mSeqNumber = firstSeqNumberInPlaylist; + } + } else { + mSeqNumber = getSeqNumberForTime(mStartTimeUs); + mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); + } + mStartTimeUsRelative = true; + ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)", mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); } else { - // If this is a live session, start 3 segments from the end. - mSeqNumber = lastSeqNumberInPlaylist - 3; + mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); + if (mAdaptive) { + // avoid double fetch/decode + mSeqNumber += 1; + } + ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); + if (mSeqNumber < minSeq) { + mSeqNumber = minSeq; + } + if (mSeqNumber < firstSeqNumberInPlaylist) { mSeqNumber = firstSeqNumberInPlaylist; } - ALOGV("Initial sequence number for live event %ld from (%ld .. %ld)", + + if (mSeqNumber > lastSeqNumberInPlaylist) { + mSeqNumber = lastSeqNumberInPlaylist; + } + ALOGV("Initial sequence number for live event %d from (%d .. %d)", mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); } - - mStartTimeUs = -1ll; } if (mSeqNumber < firstSeqNumberInPlaylist @@ -772,7 +801,8 @@ void PlaylistFetcher::onDownloadNext() { if (delayUs > kMaxMonitorDelayUs) { delayUs = kMaxMonitorDelayUs; } - ALOGV("sequence number high: %ld from (%ld .. %ld), monitor in %lld (retry=%d)", + ALOGV("sequence number high: %d from (%d .. %d), " + "monitor in %" PRId64 " (retry=%d)", mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist, delayUs, mNumRetries); postMonitorQueue(delayUs); @@ -790,7 +820,7 @@ void PlaylistFetcher::onDownloadNext() { if (mSeqNumber < firstSeqNumberInPlaylist) { mSeqNumber = firstSeqNumberInPlaylist; } - explicitDiscontinuity = true; + discontinuity = true; // fall through } else { @@ -815,7 +845,8 @@ void PlaylistFetcher::onDownloadNext() { int32_t val; if (itemMeta->findInt32("discontinuity", &val) && val != 0) { - explicitDiscontinuity = true; + mDiscontinuitySeq++; + discontinuity = true; } int64_t range_offset, range_length; @@ -846,6 +877,7 @@ void PlaylistFetcher::onDownloadNext() { } // block-wise download + bool startup = mStartup; ssize_t bytesRead; do { bytesRead = mSession->fetchFile( @@ -875,7 +907,7 @@ void PlaylistFetcher::onDownloadNext() { return; } - if (mStartup || seekDiscontinuity || explicitDiscontinuity) { + if (startup || discontinuity) { // Signal discontinuity. if (mPlaylist->isComplete() || mPlaylist->isEvent()) { @@ -885,16 +917,17 @@ void PlaylistFetcher::onDownloadNext() { mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); } - if (seekDiscontinuity || explicitDiscontinuity) { - ALOGI("queueing discontinuity (seek=%d, explicit=%d)", - seekDiscontinuity, explicitDiscontinuity); + if (discontinuity) { + ALOGI("queueing discontinuity (explicit=%d)", discontinuity); queueDiscontinuity( - explicitDiscontinuity - ? ATSParser::DISCONTINUITY_FORMATCHANGE - : ATSParser::DISCONTINUITY_SEEK, + ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */); + + discontinuity = false; } + + startup = false; } err = OK; @@ -914,23 +947,19 @@ void PlaylistFetcher::onDownloadNext() { } if (err == -EAGAIN) { - // bad starting sequence number hint + // starting sequence number too low/high + mTSParser.clear(); postMonitorQueue(); return; - } - - if (err == ERROR_OUT_OF_RANGE) { + } else if (err == ERROR_OUT_OF_RANGE) { // reached stopping point - stopAsync(/* selfTriggered = */ true); + stopAsync(/* clear = */ false); return; - } - - if (err != OK) { + } else if (err != OK) { notifyError(err); return; } - mStartup = false; } while (bytesRead != 0); if (bufferStartsWithTsSyncByte(buffer)) { @@ -982,7 +1011,16 @@ void PlaylistFetcher::onDownloadNext() { // bulk extract non-ts files if (tsBuffer == NULL) { - err = extractAndQueueAccessUnits(buffer, itemMeta); + err = extractAndQueueAccessUnits(buffer, itemMeta); + if (err == -EAGAIN) { + // starting sequence number too low/high + postMonitorQueue(); + return; + } else if (err == ERROR_OUT_OF_RANGE) { + // reached stopping point + stopAsync(/* clear = */false); + return; + } } if (err != OK) { @@ -995,6 +1033,66 @@ void PlaylistFetcher::onDownloadNext() { postMonitorQueue(); } +int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const { + int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; + if (mPlaylist->meta() == NULL + || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { + firstSeqNumberInPlaylist = 0; + } + lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1; + + int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1; + while (index >= 0 && anchorTimeUs > mStartTimeUs) { + sp<AMessage> itemMeta; + CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); + + int64_t itemDurationUs; + CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); + + anchorTimeUs -= itemDurationUs; + --index; + } + + int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1; + if (newSeqNumber <= lastSeqNumberInPlaylist) { + return newSeqNumber; + } else { + return lastSeqNumberInPlaylist; + } +} + +int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { + int32_t firstSeqNumberInPlaylist; + if (mPlaylist->meta() == NULL + || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { + firstSeqNumberInPlaylist = 0; + } + + size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); + if (discontinuitySeq < curDiscontinuitySeq) { + return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1); + } + + size_t index = 0; + while (index < mPlaylist->size()) { + sp<AMessage> itemMeta; + CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); + + int64_t discontinuity; + if (itemMeta->findInt64("discontinuity", &discontinuity)) { + curDiscontinuitySeq++; + } + + if (curDiscontinuitySeq == discontinuitySeq) { + return firstSeqNumberInPlaylist + index; + } + + ++index; + } + + return firstSeqNumberInPlaylist + mPlaylist->size(); +} + int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { int32_t firstSeqNumberInPlaylist; if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( @@ -1027,6 +1125,23 @@ int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { return firstSeqNumberInPlaylist + index; } +const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( + const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { + sp<MetaData> format = source->getFormat(); + if (format != NULL) { + // for simplicity, store a reference to the format in each unit + accessUnit->meta()->setObject("format", format); + } + + if (discard) { + accessUnit->meta()->setInt32("discard", discard); + } + + accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); + accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); + return accessUnit; +} + status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { if (mTSParser == NULL) { // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. @@ -1042,7 +1157,9 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu mTSParser->signalDiscontinuity( ATSParser::DISCONTINUITY_SEEK, extra); + mAbsoluteTimeAnchorUs = mNextPTSTimeUs; mNextPTSTimeUs = -1ll; + mFirstPTSValid = false; } size_t offset = 0; @@ -1102,43 +1219,108 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu && source->dequeueAccessUnit(&accessUnit) == OK) { CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); - if (mMinStartTimeUs > 0) { - if (timeUs < mMinStartTimeUs) { - // TODO untested path - // try a later ts - int32_t targetDuration; - mPlaylist->meta()->findInt32("target-duration", &targetDuration); - int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration; - if (incr == 0) { - // increment mSeqNumber by at least one - incr = 1; + + if (mStartup) { + if (!mFirstPTSValid) { + mFirstTimeUs = timeUs; + mFirstPTSValid = true; + } + if (mStartTimeUsRelative) { + timeUs -= mFirstTimeUs; + if (timeUs < 0) { + timeUs = 0; } - mSeqNumber += incr; - err = -EAGAIN; - break; - } else { - int64_t startTimeUs; - if (mStartTimeUsNotify != NULL - && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) { - mStartTimeUsNotify->setInt64(key, timeUs); - - uint32_t streamMask = 0; - mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); - streamMask |= mPacketSources.keyAt(i); - mStartTimeUsNotify->setInt32("streamMask", streamMask); - - if (streamMask == mStreamTypeMask) { - mStartTimeUsNotify->post(); - mStartTimeUsNotify.clear(); - } + } + + if (timeUs < mStartTimeUs) { + // buffer up to the closest preceding IDR frame + ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us", + timeUs, mStartTimeUs); + const char *mime; + sp<MetaData> format = source->getFormat(); + bool isAvc = false; + if (format != NULL && format->findCString(kKeyMIMEType, &mime) + && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) { + isAvc = true; + } + if (isAvc && IsIDR(accessUnit)) { + mVideoBuffer->clear(); + } + if (isAvc) { + mVideoBuffer->queueAccessUnit(accessUnit); + } + + continue; + } + } + + CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); + if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) { + + int32_t targetDurationSecs; + CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); + int64_t targetDurationUs = targetDurationSecs * 1000000ll; + // mStartup + // mStartup is true until we have queued a packet for all the streams + // we are fetching. We queue packets whose timestamps are greater than + // mStartTimeUs. + // mSegmentStartTimeUs >= 0 + // mSegmentStartTimeUs is non-negative when adapting or switching tracks + // timeUs - mStartTimeUs > targetDurationUs: + // This and the 2 above conditions should only happen when adapting in a live + // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher + // would start fetching after timeUs, which should be greater than mStartTimeUs; + // the old fetcher would then continue fetching data until timeUs. We don't want + // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to + // stop as early as possible. The definition of being "too far ahead" is + // arbitrary; here we use targetDurationUs as threshold. + if (mStartup && mSegmentStartTimeUs >= 0 + && timeUs - mStartTimeUs > targetDurationUs) { + // we just guessed a starting timestamp that is too high when adapting in a + // live stream; re-adjust based on the actual timestamp extracted from the + // media segment; if we didn't move backward after the re-adjustment + // (newSeqNumber), start at least 1 segment prior. + int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); + if (newSeqNumber >= mSeqNumber) { + --mSeqNumber; + } else { + mSeqNumber = newSeqNumber; + } + mStartTimeUsNotify = mNotify->dup(); + mStartTimeUsNotify->setInt32("what", kWhatStartedAt); + return -EAGAIN; + } + + int32_t seq; + if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) { + mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); + } + int64_t startTimeUs; + if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) { + mStartTimeUsNotify->setInt64(key, timeUs); + + uint32_t streamMask = 0; + mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); + streamMask |= mPacketSources.keyAt(i); + mStartTimeUsNotify->setInt32("streamMask", streamMask); + + if (streamMask == mStreamTypeMask) { + mStartup = false; + mStartTimeUsNotify->post(); + mStartTimeUsNotify.clear(); } } } if (mStopParams != NULL) { // Queue discontinuity in original stream. + int32_t discontinuitySeq; int64_t stopTimeUs; - if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) { + if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) + || discontinuitySeq > mDiscontinuitySeq + || !mStopParams->findInt64(key, &stopTimeUs) + || (discontinuitySeq == mDiscontinuitySeq + && timeUs >= stopTimeUs)) { packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); mStreamTypeMask &= ~stream; mPacketSources.removeItemsAt(i); @@ -1147,15 +1329,18 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu } // Note that we do NOT dequeue any discontinuities except for format change. - - // for simplicity, store a reference to the format in each unit - sp<MetaData> format = source->getFormat(); - if (format != NULL) { - accessUnit->meta()->setObject("format", format); + if (stream == LiveSession::STREAMTYPE_VIDEO) { + const bool discard = true; + status_t status; + while (mVideoBuffer->hasBufferAvailable(&status)) { + sp<ABuffer> videoBuffer; + mVideoBuffer->dequeueAccessUnit(&videoBuffer); + setAccessUnitProperties(videoBuffer, source, discard); + packetSource->queueAccessUnit(videoBuffer); + } } - // Stash the sequence number so we can hint future playlist where to start at. - accessUnit->meta()->setInt32("seq", mSeqNumber); + setAccessUnitProperties(accessUnit, source); packetSource->queueAccessUnit(accessUnit); } @@ -1181,9 +1366,35 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu return OK; } +/* static */ +bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( + const sp<ABuffer> &buffer) { + size_t pos = 0; + + // skip possible BOM + if (buffer->size() >= pos + 3 && + !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { + pos += 3; + } + + // accept WEBVTT followed by SPACE, TAB or (CR) LF + if (buffer->size() < pos + 6 || + memcmp("WEBVTT", buffer->data() + pos, 6)) { + return false; + } + pos += 6; + + if (buffer->size() == pos) { + return true; + } + + uint8_t sep = buffer->data()[pos]; + return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; +} + status_t PlaylistFetcher::extractAndQueueAccessUnits( const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { - if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) { + if (bufferStartsWithWebVTTMagicSequence(buffer)) { if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { ALOGE("This stream only contains subtitles."); return ERROR_MALFORMED; @@ -1196,7 +1407,9 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( CHECK(itemMeta->findInt64("durationUs", &durationUs)); buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); buffer->meta()->setInt64("durationUs", durationUs); - buffer->meta()->setInt32("seq", mSeqNumber); + buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); + buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); + buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration); packetSource->queueAccessUnit(buffer); return OK; @@ -1262,14 +1475,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( firstID3Tag = false; } - if (!mFirstPTSValid) { - mFirstPTSValid = true; - mFirstPTS = PTS; - } - PTS -= mFirstPTS; - - int64_t timeUs = (PTS * 100ll) / 9ll + mAbsoluteTimeAnchorUs; - if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { ALOGW("This stream only contains audio data!"); @@ -1312,6 +1517,12 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( int32_t sampleRate; CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); + int64_t timeUs = (PTS * 100ll) / 9ll; + if (!mFirstPTSValid) { + mFirstPTSValid = true; + mFirstTimeUs = timeUs; + } + size_t offset = 0; while (offset < buffer->size()) { const uint8_t *adtsHeader = buffer->data() + offset; @@ -1336,19 +1547,70 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits( CHECK_LE(offset + aac_frame_length, buffer->size()); - sp<ABuffer> unit = new ABuffer(aac_frame_length); - memcpy(unit->data(), adtsHeader, aac_frame_length); - int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; - unit->meta()->setInt64("timeUs", unitTimeUs); + offset += aac_frame_length; // Each AAC frame encodes 1024 samples. numSamples += 1024; - unit->meta()->setInt32("seq", mSeqNumber); - packetSource->queueAccessUnit(unit); + if (mStartup) { + int64_t startTimeUs = unitTimeUs; + if (mStartTimeUsRelative) { + startTimeUs -= mFirstTimeUs; + if (startTimeUs < 0) { + startTimeUs = 0; + } + } + if (startTimeUs < mStartTimeUs) { + continue; + } - offset += aac_frame_length; + if (mStartTimeUsNotify != NULL) { + int32_t targetDurationSecs; + CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); + int64_t targetDurationUs = targetDurationSecs * 1000000ll; + + // Duplicated logic from how we handle .ts playlists. + if (mStartup && mSegmentStartTimeUs >= 0 + && timeUs - mStartTimeUs > targetDurationUs) { + int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); + if (newSeqNumber >= mSeqNumber) { + --mSeqNumber; + } else { + mSeqNumber = newSeqNumber; + } + return -EAGAIN; + } + + mStartTimeUsNotify->setInt64("timeUsAudio", timeUs); + mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); + mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); + mStartTimeUsNotify->post(); + mStartTimeUsNotify.clear(); + } + } + + if (mStopParams != NULL) { + // Queue discontinuity in original stream. + int32_t discontinuitySeq; + int64_t stopTimeUs; + if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) + || discontinuitySeq > mDiscontinuitySeq + || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) + || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { + packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); + mStreamTypeMask = 0; + mPacketSources.clear(); + return ERROR_OUT_OF_RANGE; + } + } + + sp<ABuffer> unit = new ABuffer(aac_frame_length); + memcpy(unit->data(), adtsHeader, aac_frame_length); + + unit->meta()->setInt64("timeUs", unitTimeUs); + setAccessUnitProperties(unit, packetSource); + packetSource->queueAccessUnit(unit); } return OK; |