/* * Copyright (C) 2010 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "AnotherPacketSource" #include "AnotherPacketSource.h" #include "include/avc_utils.h" #include #include #include #include #include #include #include #include #include #include #include namespace android { const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs AnotherPacketSource::AnotherPacketSource(const sp &meta) : mIsAudio(false), mIsVideo(false), mEnabled(true), mFormat(NULL), mLastQueuedTimeUs(0), mEOSResult(OK), mLatestEnqueuedMeta(NULL), mLatestDequeuedMeta(NULL) { setFormat(meta); mDiscontinuitySegments.push_back(DiscontinuitySegment()); } void AnotherPacketSource::setFormat(const sp &meta) { if (mFormat != NULL) { // Only allowed to be set once. Requires explicit clear to reset. return; } mIsAudio = false; mIsVideo = false; if (meta == NULL) { return; } mFormat = meta; const char *mime; CHECK(meta->findCString(kKeyMIMEType, &mime)); if (!strncasecmp("audio/", mime, 6)) { mIsAudio = true; } else if (!strncasecmp("video/", mime, 6)) { mIsVideo = true; } else { CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12)); } } AnotherPacketSource::~AnotherPacketSource() { } status_t AnotherPacketSource::start(MetaData * /* params */) { return OK; } status_t AnotherPacketSource::stop() { return OK; } sp AnotherPacketSource::getFormat() { Mutex::Autolock autoLock(mLock); if (mFormat != NULL) { return mFormat; } List >::iterator it = mBuffers.begin(); while (it != mBuffers.end()) { sp buffer = *it; int32_t discontinuity; if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) { sp object; if (buffer->meta()->findObject("format", &object)) { setFormat(static_cast(object.get())); return mFormat; } } ++it; } return NULL; } status_t AnotherPacketSource::dequeueAccessUnit(sp *buffer) { buffer->clear(); Mutex::Autolock autoLock(mLock); while (mEOSResult == OK && mBuffers.empty()) { mCondition.wait(mLock); } if (!mBuffers.empty()) { *buffer = *mBuffers.begin(); mBuffers.erase(mBuffers.begin()); int32_t discontinuity; if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) { if (wasFormatChange(discontinuity)) { mFormat.clear(); } mDiscontinuitySegments.erase(mDiscontinuitySegments.begin()); // CHECK(!mDiscontinuitySegments.empty()); return INFO_DISCONTINUITY; } // CHECK(!mDiscontinuitySegments.empty()); DiscontinuitySegment &seg = *mDiscontinuitySegments.begin(); int64_t timeUs; mLatestDequeuedMeta = (*buffer)->meta()->dup(); CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs)); if (timeUs > seg.mMaxDequeTimeUs) { seg.mMaxDequeTimeUs = timeUs; } sp object; if ((*buffer)->meta()->findObject("format", &object)) { setFormat(static_cast(object.get())); } return OK; } return mEOSResult; } void AnotherPacketSource::requeueAccessUnit(const sp &buffer) { // TODO: update corresponding book keeping info. Mutex::Autolock autoLock(mLock); mBuffers.push_front(buffer); } status_t AnotherPacketSource::read( MediaBuffer **out, const ReadOptions *) { *out = NULL; Mutex::Autolock autoLock(mLock); while (mEOSResult == OK && mBuffers.empty()) { mCondition.wait(mLock); } if (!mBuffers.empty()) { const sp buffer = *mBuffers.begin(); mBuffers.erase(mBuffers.begin()); int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { if (wasFormatChange(discontinuity)) { mFormat.clear(); } mDiscontinuitySegments.erase(mDiscontinuitySegments.begin()); // CHECK(!mDiscontinuitySegments.empty()); return INFO_DISCONTINUITY; } mLatestDequeuedMeta = buffer->meta()->dup(); sp object; if (buffer->meta()->findObject("format", &object)) { setFormat(static_cast(object.get())); } int64_t timeUs; CHECK(buffer->meta()->findInt64("timeUs", &timeUs)); // CHECK(!mDiscontinuitySegments.empty()); DiscontinuitySegment &seg = *mDiscontinuitySegments.begin(); if (timeUs > seg.mMaxDequeTimeUs) { seg.mMaxDequeTimeUs = timeUs; } MediaBuffer *mediaBuffer = new MediaBuffer(buffer); mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs); int32_t isSync; if (buffer->meta()->findInt32("isSync", &isSync)) { mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync); } sp sei; if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) { mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size()); } *out = mediaBuffer; return OK; } return mEOSResult; } bool AnotherPacketSource::wasFormatChange( int32_t discontinuityType) const { if (mIsAudio) { return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0; } if (mIsVideo) { return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0; } return false; } void AnotherPacketSource::queueAccessUnit(const sp &buffer) { int32_t damaged; if (buffer->meta()->findInt32("damaged", &damaged) && damaged) { // LOG(VERBOSE) << "discarding damaged AU"; return; } Mutex::Autolock autoLock(mLock); mBuffers.push_back(buffer); mCondition.signal(); int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)){ ALOGV("queueing a discontinuity with queueAccessUnit"); mLastQueuedTimeUs = 0ll; mEOSResult = OK; mLatestEnqueuedMeta = NULL; mDiscontinuitySegments.push_back(DiscontinuitySegment()); return; } int64_t lastQueuedTimeUs; CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs)); mLastQueuedTimeUs = lastQueuedTimeUs; ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6); // CHECK(!mDiscontinuitySegments.empty()); DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end()); if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) { tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs; } if (tailSeg.mMaxDequeTimeUs == -1) { tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs; } if (mLatestEnqueuedMeta == NULL) { mLatestEnqueuedMeta = buffer->meta()->dup(); } else { int64_t latestTimeUs = 0; int64_t frameDeltaUs = 0; CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs)); if (lastQueuedTimeUs > latestTimeUs) { mLatestEnqueuedMeta = buffer->meta()->dup(); frameDeltaUs = lastQueuedTimeUs - latestTimeUs; mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) { // For B frames frameDeltaUs = latestTimeUs - lastQueuedTimeUs; mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); } } } void AnotherPacketSource::clear() { Mutex::Autolock autoLock(mLock); mBuffers.clear(); mEOSResult = OK; mDiscontinuitySegments.clear(); mDiscontinuitySegments.push_back(DiscontinuitySegment()); mFormat = NULL; mLatestEnqueuedMeta = NULL; } void AnotherPacketSource::queueDiscontinuity( ATSParser::DiscontinuityType type, const sp &extra, bool discard) { Mutex::Autolock autoLock(mLock); if (discard) { // Leave only discontinuities in the queue. List >::iterator it = mBuffers.begin(); while (it != mBuffers.end()) { sp oldBuffer = *it; int32_t oldDiscontinuityType; if (!oldBuffer->meta()->findInt32( "discontinuity", &oldDiscontinuityType)) { it = mBuffers.erase(it); continue; } ++it; } for (List::iterator it2 = mDiscontinuitySegments.begin(); it2 != mDiscontinuitySegments.end(); ++it2) { DiscontinuitySegment &seg = *it2; seg.clear(); } } mEOSResult = OK; mLastQueuedTimeUs = 0; mLatestEnqueuedMeta = NULL; if (type == ATSParser::DISCONTINUITY_NONE) { return; } mDiscontinuitySegments.push_back(DiscontinuitySegment()); sp buffer = new ABuffer(0); buffer->meta()->setInt32("discontinuity", static_cast(type)); buffer->meta()->setMessage("extra", extra); mBuffers.push_back(buffer); mCondition.signal(); } void AnotherPacketSource::signalEOS(status_t result) { CHECK(result != OK); Mutex::Autolock autoLock(mLock); mEOSResult = result; mCondition.signal(); } bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) { Mutex::Autolock autoLock(mLock); *finalResult = OK; if (!mEnabled) { return false; } if (!mBuffers.empty()) { return true; } *finalResult = mEOSResult; return false; } bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) { Mutex::Autolock autoLock(mLock); *finalResult = OK; if (!mEnabled) { return false; } List >::iterator it; for (it = mBuffers.begin(); it != mBuffers.end(); it++) { int32_t discontinuity; if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) { return true; } } *finalResult = mEOSResult; return false; } size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) { Mutex::Autolock autoLock(mLock); *finalResult = OK; if (!mEnabled) { return 0; } if (!mBuffers.empty()) { return mBuffers.size(); } *finalResult = mEOSResult; return 0; } int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) { Mutex::Autolock autoLock(mLock); *finalResult = mEOSResult; int64_t durationUs = 0; for (List::iterator it = mDiscontinuitySegments.begin(); it != mDiscontinuitySegments.end(); ++it) { const DiscontinuitySegment &seg = *it; // dequeued access units should be a subset of enqueued access units // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs); durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs); } return durationUs; } status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) { *timeUs = 0; Mutex::Autolock autoLock(mLock); if (mBuffers.empty()) { return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK; } sp buffer = *mBuffers.begin(); CHECK(buffer->meta()->findInt64("timeUs", timeUs)); return OK; } bool AnotherPacketSource::isFinished(int64_t duration) const { if (duration > 0) { int64_t diff = duration - mLastQueuedTimeUs; if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) { ALOGV("Detecting EOS due to near end"); return true; } } return (mEOSResult != OK); } sp AnotherPacketSource::getLatestEnqueuedMeta() { Mutex::Autolock autoLock(mLock); return mLatestEnqueuedMeta; } sp AnotherPacketSource::getLatestDequeuedMeta() { Mutex::Autolock autoLock(mLock); return mLatestDequeuedMeta; } void AnotherPacketSource::enable(bool enable) { Mutex::Autolock autoLock(mLock); mEnabled = enable; } /* * returns the sample meta that's delayUs after queue head * (NULL if such sample is unavailable) */ sp AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) { Mutex::Autolock autoLock(mLock); int64_t firstUs = -1; int64_t lastUs = -1; int64_t durationUs = 0; List >::iterator it; for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { const sp &buffer = *it; int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { durationUs += lastUs - firstUs; firstUs = -1; lastUs = -1; continue; } int64_t timeUs; if (buffer->meta()->findInt64("timeUs", &timeUs)) { if (firstUs < 0) { firstUs = timeUs; } if (lastUs < 0 || timeUs > lastUs) { lastUs = timeUs; } if (durationUs + (lastUs - firstUs) >= delayUs) { return buffer->meta(); } } } return NULL; } /* * removes samples with time equal or after meta */ void AnotherPacketSource::trimBuffersAfterMeta( const sp &meta) { if (meta == NULL) { ALOGW("trimming with NULL meta, ignoring"); return; } Mutex::Autolock autoLock(mLock); if (mBuffers.empty()) { return; } HLSTime stopTime(meta); ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld", stopTime.mSeq, (long long)stopTime.mTimeUs); List >::iterator it; List::iterator it2; sp newLatestEnqueuedMeta = NULL; int64_t newLastQueuedTimeUs = 0; for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) { const sp &buffer = *it; int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { // CHECK(it2 != mDiscontinuitySegments.end()); ++it2; continue; } HLSTime curTime(buffer->meta()); if (!(curTime < stopTime)) { ALOGV("trimming from %lld (inclusive) to end", (long long)curTime.mTimeUs); break; } newLatestEnqueuedMeta = buffer->meta(); newLastQueuedTimeUs = curTime.mTimeUs; } mBuffers.erase(it, mBuffers.end()); mLatestEnqueuedMeta = newLatestEnqueuedMeta; mLastQueuedTimeUs = newLastQueuedTimeUs; DiscontinuitySegment &seg = *it2; if (newLatestEnqueuedMeta != NULL) { seg.mMaxEnqueTimeUs = newLastQueuedTimeUs; } else { seg.clear(); } mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end()); } /* * removes samples with time equal or before meta; * returns first sample left in the queue. * * (for AVC, if trim happens, the samples left will always start * at next IDR.) */ sp AnotherPacketSource::trimBuffersBeforeMeta( const sp &meta) { HLSTime startTime(meta); ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld", startTime.mSeq, (long long)startTime.mTimeUs); sp firstMeta; int64_t firstTimeUs = -1; Mutex::Autolock autoLock(mLock); if (mBuffers.empty()) { return NULL; } sp format; bool isAvc = false; List >::iterator it; for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { const sp &buffer = *it; int32_t discontinuity; if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { mDiscontinuitySegments.erase(mDiscontinuitySegments.begin()); // CHECK(!mDiscontinuitySegments.empty()); format = NULL; isAvc = false; continue; } if (format == NULL) { sp object; if (buffer->meta()->findObject("format", &object)) { const char* mime; format = static_cast(object.get()); isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime) && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC); } } if (isAvc && !IsIDR(buffer)) { continue; } HLSTime curTime(buffer->meta()); if (startTime < curTime) { ALOGV("trimming from beginning to %lld (not inclusive)", (long long)curTime.mTimeUs); firstMeta = buffer->meta(); firstTimeUs = curTime.mTimeUs; break; } } mBuffers.erase(mBuffers.begin(), it); mLatestDequeuedMeta = NULL; // CHECK(!mDiscontinuitySegments.empty()); DiscontinuitySegment &seg = *mDiscontinuitySegments.begin(); if (firstTimeUs >= 0) { seg.mMaxDequeTimeUs = firstTimeUs; } else { seg.clear(); } return firstMeta; } } // namespace android