From 0dd229bb306df68b88e7419b3cc11fc6175be1da Mon Sep 17 00:00:00 2001 From: Robert Shih Date: Fri, 6 Mar 2015 16:45:59 -0800 Subject: AnotherPacketSource: make getBufferedDurationUs more discontinuity-aware The new getBufferedDurationUs implementation obsoletes the purpose of getEstimatedDurationUs; remove getEstimatedDurationUs and its associated member variables. Finally replace calls to getEstimatedDurationUs with getBufferedDurationUs. Change-Id: I38f20df8e177ffbfe299b203d99076fc98dcd274 --- media/libstagefright/httplive/LiveSession.cpp | 3 +- .../libstagefright/mpeg2ts/AnotherPacketSource.cpp | 180 ++++++++++----------- media/libstagefright/mpeg2ts/AnotherPacketSource.h | 24 ++- 3 files changed, 102 insertions(+), 105 deletions(-) (limited to 'media') diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index ab5bb3a..9ac764c 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -2262,8 +2262,9 @@ bool LiveSession::checkBuffering( continue; } + status_t finalResult; int64_t bufferedDurationUs = - mPacketSources[i]->getEstimatedDurationUs(); + mPacketSources[i]->getBufferedDurationUs(&finalResult); ALOGV("[%s] buffered %lld us", getNameForStream(mPacketSources.keyAt(i)), (long long)bufferedDurationUs); diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp index 87ec860..f74b859 100644 --- a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp +++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp @@ -46,9 +46,10 @@ AnotherPacketSource::AnotherPacketSource(const sp &meta) mLastQueuedTimeUs(0), mEOSResult(OK), mLatestEnqueuedMeta(NULL), - mLatestDequeuedMeta(NULL), - mQueuedDiscontinuityCount(0) { + mLatestDequeuedMeta(NULL) { setFormat(meta); + + mDiscontinuitySegments.push_back(DiscontinuitySegment()); } void AnotherPacketSource::setFormat(const sp &meta) { @@ -129,11 +130,20 @@ status_t AnotherPacketSource::dequeueAccessUnit(sp *buffer) { mFormat.clear(); } - --mQueuedDiscontinuityCount; + mDiscontinuitySegments.erase(mDiscontinuitySegments.begin()); + // CHECK(!mDiscontinuitySegments.empty()); return INFO_DISCONTINUITY; } + // CHECK(!mDiscontinuitySegments.empty()); + DiscontinuitySegment &seg = *mDiscontinuitySegments.begin(); + + int64_t timeUs; mLatestDequeuedMeta = (*buffer)->meta()->dup(); + CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs)); + if (timeUs > seg.mMaxDequeTimeUs) { + seg.mMaxDequeTimeUs = timeUs; + } sp object; if ((*buffer)->meta()->findObject("format", &object)) { @@ -172,6 +182,8 @@ status_t AnotherPacketSource::read( mFormat.clear(); } + mDiscontinuitySegments.erase(mDiscontinuitySegments.begin()); + // CHECK(!mDiscontinuitySegments.empty()); return INFO_DISCONTINUITY; } @@ -184,6 +196,11 @@ status_t AnotherPacketSource::read( int64_t timeUs; CHECK(buffer->meta()->findInt64("timeUs", &timeUs)); + // CHECK(!mDiscontinuitySegments.empty()); + DiscontinuitySegment &seg = *mDiscontinuitySegments.begin(); + if (timeUs > seg.mMaxDequeTimeUs) { + seg.mMaxDequeTimeUs = timeUs; + } MediaBuffer *mediaBuffer = new MediaBuffer(buffer); @@ -226,12 +243,14 @@ void AnotherPacketSource::queueAccessUnit(const sp &buffer) { mCondition.signal(); int32_t discontinuity; - if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { - // discontinuity handling needs to be consistent with queueDiscontinuity() - ++mQueuedDiscontinuityCount; + if (buffer->meta()->findInt32("discontinuity", &discontinuity)){ + ALOGV("queueing a discontinuity with queueAccessUnit"); + mLastQueuedTimeUs = 0ll; mEOSResult = OK; mLatestEnqueuedMeta = NULL; + + mDiscontinuitySegments.push_back(DiscontinuitySegment()); return; } @@ -241,6 +260,15 @@ void AnotherPacketSource::queueAccessUnit(const sp &buffer) { ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6); + // CHECK(!mDiscontinuitySegments.empty()); + DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end()); + if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) { + tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs; + } + if (tailSeg.mMaxDequeTimeUs == -1) { + tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs; + } + if (mLatestEnqueuedMeta == NULL) { mLatestEnqueuedMeta = buffer->meta()->dup(); } else { @@ -264,7 +292,9 @@ void AnotherPacketSource::clear() { mBuffers.clear(); mEOSResult = OK; - mQueuedDiscontinuityCount = 0; + + mDiscontinuitySegments.clear(); + mDiscontinuitySegments.push_back(DiscontinuitySegment()); mFormat = NULL; mLatestEnqueuedMeta = NULL; @@ -291,6 +321,14 @@ void AnotherPacketSource::queueDiscontinuity( ++it; } + + for (List::iterator it2 = mDiscontinuitySegments.begin(); + it2 != mDiscontinuitySegments.end(); + ++it2) { + DiscontinuitySegment &seg = *it2; + seg.clear(); + } + } mEOSResult = OK; @@ -301,7 +339,8 @@ void AnotherPacketSource::queueDiscontinuity( return; } - ++mQueuedDiscontinuityCount; + mDiscontinuitySegments.push_back(DiscontinuitySegment()); + sp buffer = new ABuffer(0); buffer->meta()->setInt32("discontinuity", static_cast(type)); buffer->meta()->setMessage("extra", extra); @@ -352,95 +391,19 @@ bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) { int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) { Mutex::Autolock autoLock(mLock); - return getBufferedDurationUs_l(finalResult); -} - -int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) { *finalResult = mEOSResult; - if (mBuffers.empty()) { - return 0; - } - - int64_t time1 = -1; - int64_t time2 = -1; int64_t durationUs = 0; - - List >::iterator it; - for (it = mBuffers.begin(); it != mBuffers.end(); it++) { - const sp &buffer = *it; - - int32_t discard; - if (buffer->meta()->findInt32("discard", &discard) && discard) { - continue; - } - - int64_t timeUs; - if (buffer->meta()->findInt64("timeUs", &timeUs)) { - if (time1 < 0 || timeUs < time1) { - time1 = timeUs; - } - - if (time2 < 0 || timeUs > time2) { - time2 = timeUs; - } - } else { - // This is a discontinuity, reset everything. - durationUs += time2 - time1; - time1 = time2 = -1; - } - } - - return durationUs + (time2 - time1); -} - -// A cheaper but less precise version of getBufferedDurationUs that we would like to use in -// LiveSession::dequeueAccessUnit to trigger downwards adaptation. -int64_t AnotherPacketSource::getEstimatedDurationUs() { - Mutex::Autolock autoLock(mLock); - if (mBuffers.empty()) { - return 0; - } - - if (mQueuedDiscontinuityCount > 0) { - status_t finalResult; - return getBufferedDurationUs_l(&finalResult); - } - - sp buffer; - int32_t discard; - int64_t startTimeUs = -1ll; - List >::iterator it; - for (it = mBuffers.begin(); it != mBuffers.end(); it++) { - buffer = *it; - if (buffer->meta()->findInt32("discard", &discard) && discard) { - continue; - } - buffer->meta()->findInt64("timeUs", &startTimeUs); - break; + for (List::iterator it = mDiscontinuitySegments.begin(); + it != mDiscontinuitySegments.end(); + ++it) { + const DiscontinuitySegment &seg = *it; + // dequeued access units should be a subset of enqueued access units + // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs); + durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs); } - if (startTimeUs < 0) { - return 0; - } - - it = mBuffers.end(); - --it; - buffer = *it; - - int64_t endTimeUs; - buffer->meta()->findInt64("timeUs", &endTimeUs); - if (endTimeUs < 0) { - return 0; - } - - int64_t diffUs; - if (endTimeUs > startTimeUs) { - diffUs = endTimeUs - startTimeUs; - } else { - diffUs = startTimeUs - endTimeUs; - } - return diffUs; + return durationUs; } status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) { @@ -540,14 +503,15 @@ void AnotherPacketSource::trimBuffersAfterMeta( stopTime.mSeq, (long long)stopTime.mTimeUs); List >::iterator it; + List::iterator it2; sp newLatestEnqueuedMeta = NULL; int64_t newLastQueuedTimeUs = 0; - size_t newDiscontinuityCount = 0; - for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { + for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) { const sp &buffer = *it; int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { - newDiscontinuityCount++; + // CHECK(it2 != mDiscontinuitySegments.end()); + ++it2; continue; } @@ -560,10 +524,18 @@ void AnotherPacketSource::trimBuffersAfterMeta( newLatestEnqueuedMeta = buffer->meta(); newLastQueuedTimeUs = curTime.mTimeUs; } + mBuffers.erase(it, mBuffers.end()); mLatestEnqueuedMeta = newLatestEnqueuedMeta; mLastQueuedTimeUs = newLastQueuedTimeUs; - mQueuedDiscontinuityCount = newDiscontinuityCount; + + DiscontinuitySegment &seg = *it2; + if (newLatestEnqueuedMeta != NULL) { + seg.mMaxEnqueTimeUs = newLastQueuedTimeUs; + } else { + seg.clear(); + } + mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end()); } /* @@ -580,6 +552,7 @@ sp AnotherPacketSource::trimBuffersBeforeMeta( startTime.mSeq, (long long)startTime.mTimeUs); sp firstMeta; + int64_t firstTimeUs = -1; Mutex::Autolock autoLock(mLock); if (mBuffers.empty()) { return NULL; @@ -589,14 +562,14 @@ sp AnotherPacketSource::trimBuffersBeforeMeta( bool isAvc = false; List >::iterator it; - size_t discontinuityCount = 0; for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { const sp &buffer = *it; int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { + mDiscontinuitySegments.erase(mDiscontinuitySegments.begin()); + // CHECK(!mDiscontinuitySegments.empty()); format = NULL; isAvc = false; - discontinuityCount++; continue; } if (format == NULL) { @@ -618,12 +591,21 @@ sp AnotherPacketSource::trimBuffersBeforeMeta( ALOGV("trimming from beginning to %lld (not inclusive)", (long long)curTime.mTimeUs); firstMeta = buffer->meta(); + firstTimeUs = curTime.mTimeUs; break; } } mBuffers.erase(mBuffers.begin(), it); - mQueuedDiscontinuityCount -= discontinuityCount; mLatestDequeuedMeta = NULL; + + // CHECK(!mDiscontinuitySegments.empty()); + DiscontinuitySegment &seg = *mDiscontinuitySegments.begin(); + if (firstTimeUs >= 0) { + seg.mMaxDequeTimeUs = firstTimeUs; + } else { + seg.clear(); + } + return firstMeta; } diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.h b/media/libstagefright/mpeg2ts/AnotherPacketSource.h index 08cd92e..eb9dc9b 100644 --- a/media/libstagefright/mpeg2ts/AnotherPacketSource.h +++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.h @@ -53,8 +53,6 @@ struct AnotherPacketSource : public MediaSource { // presentation timestamps since the last discontinuity (if any). int64_t getBufferedDurationUs(status_t *finalResult); - int64_t getEstimatedDurationUs(); - status_t nextBufferTime(int64_t *timeUs); void queueAccessUnit(const sp &buffer); @@ -84,6 +82,25 @@ protected: virtual ~AnotherPacketSource(); private: + + struct DiscontinuitySegment { + int64_t mMaxDequeTimeUs, mMaxEnqueTimeUs; + DiscontinuitySegment() + : mMaxDequeTimeUs(-1), + mMaxEnqueTimeUs(-1) { + }; + + void clear() { + mMaxDequeTimeUs = mMaxEnqueTimeUs = -1; + } + }; + + // Discontinuity segments are consecutive access units between + // discontinuity markers. There should always be at least _ONE_ + // discontinuity segment, hence the various CHECKs in + // AnotherPacketSource.cpp for non-empty()-ness. + List mDiscontinuitySegments; + Mutex mLock; Condition mCondition; @@ -97,10 +114,7 @@ private: sp mLatestEnqueuedMeta; sp mLatestDequeuedMeta; - size_t mQueuedDiscontinuityCount; - bool wasFormatChange(int32_t discontinuityType) const; - int64_t getBufferedDurationUs_l(status_t *finalResult); DISALLOW_EVIL_CONSTRUCTORS(AnotherPacketSource); }; -- cgit v1.1