diff options
Diffstat (limited to 'media/libstagefright/httplive/LiveSession.cpp')
-rw-r--r-- | media/libstagefright/httplive/LiveSession.cpp | 279 |
1 files changed, 220 insertions, 59 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index 10cdde2..8667a6b 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -57,7 +57,7 @@ LiveSession::LiveSession( mHTTPService(httpService), mInPreparationPhase(true), mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), - mPrevBandwidthIndex(-1), + mCurBandwidthIndex(-1), mStreamMask(0), mNewStreamMask(0), mSwapMask(0), @@ -68,13 +68,17 @@ LiveSession::LiveSession( mReconfigurationInProgress(false), mSwitchInProgress(false), mDisconnectReplyID(0), - mSeekReplyID(0) { + mSeekReplyID(0), + mFirstTimeUsValid(false), + mFirstTimeUs(0), + mLastSeekTimeUs(0) { mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); for (size_t i = 0; i < kMaxStreams; ++i) { + mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); } @@ -109,31 +113,65 @@ status_t LiveSession::dequeueAccessUnit( return -EWOULDBLOCK; } + status_t finalResult; + sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); + if (discontinuityQueue->hasBufferAvailable(&finalResult)) { + discontinuityQueue->dequeueAccessUnit(accessUnit); + // seeking, track switching + sp<AMessage> extra; + int64_t timeUs; + if ((*accessUnit)->meta()->findMessage("extra", &extra) + && extra != NULL + && extra->findInt64("timeUs", &timeUs)) { + // seeking only + mLastSeekTimeUs = timeUs; + mDiscontinuityOffsetTimesUs.clear(); + mDiscontinuityAbsStartTimesUs.clear(); + } + return INFO_DISCONTINUITY; + } + sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); - status_t finalResult; if (!packetSource->hasBufferAvailable(&finalResult)) { return finalResult == OK ? -EAGAIN : finalResult; } + // wait for counterpart + sp<AnotherPacketSource> otherSource; + if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) { + otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); + } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) { + otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); + } + if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { + return finalResult == OK ? -EAGAIN : finalResult; + } + status_t err = packetSource->dequeueAccessUnit(accessUnit); + size_t streamIdx; const char *streamStr; switch (stream) { case STREAMTYPE_AUDIO: + streamIdx = kAudioIndex; streamStr = "audio"; break; case STREAMTYPE_VIDEO: + streamIdx = kVideoIndex; streamStr = "video"; break; case STREAMTYPE_SUBTITLES: + streamIdx = kSubtitleIndex; streamStr = "subs"; break; default: TRESPASS(); } + StreamItem& strm = mStreams[streamIdx]; if (err == INFO_DISCONTINUITY) { + // adaptive streaming, discontinuities in the playlist int32_t type; CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); @@ -148,10 +186,7 @@ status_t LiveSession::dequeueAccessUnit( extra == NULL ? "NULL" : extra->debugString().c_str()); int32_t swap; - if (type == ATSParser::DISCONTINUITY_FORMATCHANGE - && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) - && swap) { - + if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { int32_t switchGeneration; CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); { @@ -164,13 +199,67 @@ status_t LiveSession::dequeueAccessUnit( msg->post(); } } + } else { + size_t seq = strm.mCurDiscontinuitySeq; + int64_t offsetTimeUs; + if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { + offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); + } else { + offsetTimeUs = 0; + } + + seq += 1; + if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + int64_t firstTimeUs; + firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); + offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; + offsetTimeUs += strm.mLastSampleDurationUs; + } else { + offsetTimeUs += strm.mLastSampleDurationUs; + } + + mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); } } else if (err == OK) { + if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { int64_t timeUs; + int32_t discontinuitySeq = 0; CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); - ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); + (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); + strm.mCurDiscontinuitySeq = discontinuitySeq; + + int32_t discard = 0; + int64_t firstTimeUs; + if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { + int64_t durUs; // approximate sample duration + if (timeUs > strm.mLastDequeuedTimeUs) { + durUs = timeUs - strm.mLastDequeuedTimeUs; + } else { + durUs = strm.mLastDequeuedTimeUs - timeUs; + } + strm.mLastSampleDurationUs = durUs; + firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); + } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { + firstTimeUs = timeUs; + } else { + mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); + firstTimeUs = timeUs; + } + + strm.mLastDequeuedTimeUs = timeUs; + if (timeUs >= firstTimeUs) { + timeUs -= firstTimeUs; + } else { + timeUs = 0; + } + timeUs += mLastSeekTimeUs; + if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { + timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); + } + ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); + (*accessUnit)->meta()->setInt64("timeUs", timeUs); mLastDequeuedTimeUs = timeUs; mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; } else if (stream == STREAMTYPE_SUBTITLES) { @@ -289,7 +378,9 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } - tryToFinishBandwidthSwitch(); + if (mSwitchInProgress) { + tryToFinishBandwidthSwitch(); + } } if (mContinuation != NULL) { @@ -538,8 +629,9 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { mBandwidthItems.push(item); } + mPlaylist->pickRandomMediaItems(); changeConfiguration( - 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); + 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); } void LiveSession::finishDisconnect() { @@ -847,20 +939,20 @@ size_t LiveSession::getBandwidthIndex() { // to lowest) const size_t kMinIndex = 0; - static ssize_t mPrevBandwidthIndex = -1; + static ssize_t mCurBandwidthIndex = -1; size_t index; - if (mPrevBandwidthIndex < 0) { + if (mCurBandwidthIndex < 0) { index = kMinIndex; } else if (uniformRand() < 0.5) { - index = (size_t)mPrevBandwidthIndex; + index = (size_t)mCurBandwidthIndex; } else { - index = mPrevBandwidthIndex + 1; + index = mCurBandwidthIndex + 1; if (index == mBandwidthItems.size()) { index = kMinIndex; } } - mPrevBandwidthIndex = index; + mCurBandwidthIndex = index; #elif 0 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec @@ -937,7 +1029,10 @@ sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { status_t LiveSession::selectTrack(size_t index, bool select) { status_t err = mPlaylist->selectTrack(index, select); if (err == OK) { - (new AMessage(kWhatChangeConfiguration, id()))->post(); + sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); + msg->setInt32("bandwidthIndex", mCurBandwidthIndex); + msg->setInt32("pickTrack", select); + msg->post(); } return err; } @@ -964,15 +1059,11 @@ void LiveSession::changeConfiguration( CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; - mPrevBandwidthIndex = bandwidthIndex; + mCurBandwidthIndex = bandwidthIndex; ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", timeUs, bandwidthIndex, pickTrack); - if (pickTrack) { - mPlaylist->pickRandomMediaItems(); - } - CHECK_LT(bandwidthIndex, mBandwidthItems.size()); const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); @@ -995,14 +1086,15 @@ void LiveSession::changeConfiguration( // If we're seeking all current fetchers are discarded. if (timeUs < 0ll) { - // delay fetcher removal - discardFetcher = false; + // delay fetcher removal if not picking tracks + discardFetcher = pickTrack; for (size_t j = 0; j < kMaxStreams; ++j) { StreamType type = indexToType(j); if ((streamMask & type) && uri == URIs[j]) { resumeMask |= type; streamMask &= ~type; + discardFetcher = false; } } } @@ -1016,16 +1108,17 @@ void LiveSession::changeConfiguration( sp<AMessage> msg; if (timeUs < 0ll) { - // skip onChangeConfiguration2 (decoder destruction) if switching. + // skip onChangeConfiguration2 (decoder destruction) if not seeking. msg = new AMessage(kWhatChangeConfiguration3, id()); } else { msg = new AMessage(kWhatChangeConfiguration2, id()); } msg->setInt32("streamMask", streamMask); msg->setInt32("resumeMask", resumeMask); + msg->setInt32("pickTrack", pickTrack); msg->setInt64("timeUs", timeUs); for (size_t i = 0; i < kMaxStreams; ++i) { - if (streamMask & indexToType(i)) { + if ((streamMask | resumeMask) & indexToType(i)) { msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); } } @@ -1049,7 +1142,10 @@ void LiveSession::changeConfiguration( void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { if (!mReconfigurationInProgress) { - changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); + int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; + msg->findInt32("pickTrack", &pickTrack); + msg->findInt32("bandwidthIndex", &bandwidthIndex); + changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); } else { msg->post(1000000ll); // retry in 1 sec } @@ -1060,8 +1156,14 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { // All fetchers are either suspended or have been removed now. - uint32_t streamMask; + uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); + CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); + + // currently onChangeConfiguration2 is only called for seeking; + // remove the following CHECK if using it else where. + CHECK_EQ(resumeMask, 0); + streamMask |= resumeMask; AString URIs[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { @@ -1125,16 +1227,21 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { } int64_t timeUs; + int32_t pickTrack; bool switching = false; CHECK(msg->findInt64("timeUs", &timeUs)); + CHECK(msg->findInt32("pickTrack", &pickTrack)); if (timeUs < 0ll) { - timeUs = mLastDequeuedTimeUs; - switching = true; + if (!pickTrack) { + switching = true; + } + mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; + } else { + mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; } - mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; - mNewStreamMask = streamMask; + mNewStreamMask = streamMask | resumeMask; // Of all existing fetchers: // * Resume fetchers that are still needed and assign them original packet sources. @@ -1147,6 +1254,16 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { for (size_t j = 0; j < kMaxStreams; ++j) { if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); + + if (j != kSubtitleIndex) { + ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); + sp<AnotherPacketSource> discontinuityQueue; + discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); + discontinuityQueue->queueDiscontinuity( + ATSParser::DISCONTINUITY_NONE, + NULL, + true); + } } } @@ -1180,7 +1297,9 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { CHECK(fetcher != NULL); int32_t latestSeq = -1; - int64_t latestTimeUs = 0ll; + int64_t startTimeUs = -1; + int64_t segmentStartTimeUs = -1ll; + int32_t discontinuitySeq = -1; sp<AnotherPacketSource> sources[kMaxStreams]; // TRICKY: looping from i as earlier streams are already removed from streamMask @@ -1188,29 +1307,65 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); - if (!switching) { + if (timeUs >= 0) { sources[j]->clear(); + startTimeUs = timeUs; + + sp<AnotherPacketSource> discontinuityQueue; + sp<AMessage> extra = new AMessage; + extra->setInt64("timeUs", timeUs); + discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); + discontinuityQueue->queueDiscontinuity( + ATSParser::DISCONTINUITY_SEEK, extra, true); } else { - int32_t type, seq; - int64_t srcTimeUs; - sp<AMessage> meta = sources[j]->getLatestMeta(); + int32_t type; + int64_t srcSegmentStartTimeUs; + sp<AMessage> meta; + if (pickTrack) { + // selecting + meta = sources[j]->getLatestDequeuedMeta(); + } else { + // adapting + meta = sources[j]->getLatestEnqueuedMeta(); + } if (meta != NULL && !meta->findInt32("discontinuity", &type)) { - CHECK(meta->findInt32("seq", &seq)); - if (seq > latestSeq) { - latestSeq = seq; + int64_t tmpUs; + CHECK(meta->findInt64("timeUs", &tmpUs)); + if (startTimeUs < 0 || tmpUs < startTimeUs) { + startTimeUs = tmpUs; + } + + CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); + if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { + segmentStartTimeUs = tmpUs; } - CHECK(meta->findInt64("timeUs", &srcTimeUs)); - if (srcTimeUs > latestTimeUs) { - latestTimeUs = srcTimeUs; + + int32_t seq; + CHECK(meta->findInt32("discontinuitySeq", &seq)); + if (discontinuitySeq < 0 || seq < discontinuitySeq) { + discontinuitySeq = seq; } } - sources[j] = mPacketSources2.valueFor(indexToType(j)); - sources[j]->clear(); - uint32_t extraStreams = mNewStreamMask & (~mStreamMask); - if (extraStreams & indexToType(j)) { - sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); + if (pickTrack) { + // selecting track, queue discontinuities before content + sources[j]->clear(); + if (j == kSubtitleIndex) { + break; + } + sp<AnotherPacketSource> discontinuityQueue; + discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); + discontinuityQueue->queueDiscontinuity( + ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); + } else { + // adapting, queue discontinuities after resume + sources[j] = mPacketSources2.valueFor(indexToType(j)); + sources[j]->clear(); + uint32_t extraStreams = mNewStreamMask & (~mStreamMask); + if (extraStreams & indexToType(j)) { + sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); + } } } @@ -1222,9 +1377,10 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], - timeUs, - latestTimeUs /* min start time(us) */, - latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); + startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, + segmentStartTimeUs, + discontinuitySeq, + switching); } // All fetchers have now been started, the configuration change @@ -1236,6 +1392,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { mReconfigurationInProgress = false; if (switching) { mSwitchInProgress = true; + mSwapMask = streamMask; } else { mStreamMask = mNewStreamMask; } @@ -1254,8 +1411,8 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) { int32_t stream; CHECK(msg->findInt32("stream", &stream)); - mSwapMask |= stream; - if (mSwapMask != mStreamMask) { + mSwapMask &= ~stream; + if (mSwapMask != 0) { return; } @@ -1271,9 +1428,12 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) { } // Mark switch done when: -// 1. all old buffers are swapped out, AND -// 2. all old fetchers are removed. +// 1. all old buffers are swapped out void LiveSession::tryToFinishBandwidthSwitch() { + if (!mSwitchInProgress) { + return; + } + bool needToRemoveFetchers = false; for (size_t i = 0; i < mFetcherInfos.size(); ++i) { if (mFetcherInfos.valueAt(i).mToBeRemoved) { @@ -1281,10 +1441,11 @@ void LiveSession::tryToFinishBandwidthSwitch() { break; } } - if (!needToRemoveFetchers && mSwapMask == mStreamMask) { + + if (!needToRemoveFetchers && mSwapMask == 0) { + ALOGI("mSwitchInProgress = false"); mStreamMask = mNewStreamMask; mSwitchInProgress = false; - mSwapMask = 0; } } @@ -1310,13 +1471,13 @@ bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { return false; } - if (mPrevBandwidthIndex < 0) { + if (mCurBandwidthIndex < 0) { return true; } - if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { + if (bandwidthIndex == (size_t)mCurBandwidthIndex) { return false; - } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { + } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { return canSwitchUp(); } else { return true; |