summaryrefslogtreecommitdiffstats
path: root/media/libstagefright/NuCachedSource2.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'media/libstagefright/NuCachedSource2.cpp')
-rw-r--r--media/libstagefright/NuCachedSource2.cpp703
1 files changed, 703 insertions, 0 deletions
diff --git a/media/libstagefright/NuCachedSource2.cpp b/media/libstagefright/NuCachedSource2.cpp
new file mode 100644
index 0000000..0957426
--- /dev/null
+++ b/media/libstagefright/NuCachedSource2.cpp
@@ -0,0 +1,703 @@
+/*
+ * Copyright (C) 2010 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NDEBUG 0
+#define LOG_TAG "NuCachedSource2"
+#include <utils/Log.h>
+
+#include "include/NuCachedSource2.h"
+#include "include/HTTPBase.h"
+
+#include <cutils/properties.h>
+#include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/MediaErrors.h>
+
+namespace android {
+
+struct PageCache {
+ PageCache(size_t pageSize);
+ ~PageCache();
+
+ struct Page {
+ void *mData;
+ size_t mSize;
+ };
+
+ Page *acquirePage();
+ void releasePage(Page *page);
+
+ void appendPage(Page *page);
+ size_t releaseFromStart(size_t maxBytes);
+
+ size_t totalSize() const {
+ return mTotalSize;
+ }
+
+ void copy(size_t from, void *data, size_t size);
+
+private:
+ size_t mPageSize;
+ size_t mTotalSize;
+
+ List<Page *> mActivePages;
+ List<Page *> mFreePages;
+
+ void freePages(List<Page *> *list);
+
+ DISALLOW_EVIL_CONSTRUCTORS(PageCache);
+};
+
+PageCache::PageCache(size_t pageSize)
+ : mPageSize(pageSize),
+ mTotalSize(0) {
+}
+
+PageCache::~PageCache() {
+ freePages(&mActivePages);
+ freePages(&mFreePages);
+}
+
+void PageCache::freePages(List<Page *> *list) {
+ List<Page *>::iterator it = list->begin();
+ while (it != list->end()) {
+ Page *page = *it;
+
+ free(page->mData);
+ delete page;
+ page = NULL;
+
+ ++it;
+ }
+}
+
+PageCache::Page *PageCache::acquirePage() {
+ if (!mFreePages.empty()) {
+ List<Page *>::iterator it = mFreePages.begin();
+ Page *page = *it;
+ mFreePages.erase(it);
+
+ return page;
+ }
+
+ Page *page = new Page;
+ page->mData = malloc(mPageSize);
+ page->mSize = 0;
+
+ return page;
+}
+
+void PageCache::releasePage(Page *page) {
+ page->mSize = 0;
+ mFreePages.push_back(page);
+}
+
+void PageCache::appendPage(Page *page) {
+ mTotalSize += page->mSize;
+ mActivePages.push_back(page);
+}
+
+size_t PageCache::releaseFromStart(size_t maxBytes) {
+ size_t bytesReleased = 0;
+
+ while (maxBytes > 0 && !mActivePages.empty()) {
+ List<Page *>::iterator it = mActivePages.begin();
+
+ Page *page = *it;
+
+ if (maxBytes < page->mSize) {
+ break;
+ }
+
+ mActivePages.erase(it);
+
+ maxBytes -= page->mSize;
+ bytesReleased += page->mSize;
+
+ releasePage(page);
+ }
+
+ mTotalSize -= bytesReleased;
+ return bytesReleased;
+}
+
+void PageCache::copy(size_t from, void *data, size_t size) {
+ ALOGV("copy from %d size %d", from, size);
+
+ if (size == 0) {
+ return;
+ }
+
+ CHECK_LE(from + size, mTotalSize);
+
+ size_t offset = 0;
+ List<Page *>::iterator it = mActivePages.begin();
+ while (from >= offset + (*it)->mSize) {
+ offset += (*it)->mSize;
+ ++it;
+ }
+
+ size_t delta = from - offset;
+ size_t avail = (*it)->mSize - delta;
+
+ if (avail >= size) {
+ memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
+ return;
+ }
+
+ memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
+ ++it;
+ data = (uint8_t *)data + avail;
+ size -= avail;
+
+ while (size > 0) {
+ size_t copy = (*it)->mSize;
+ if (copy > size) {
+ copy = size;
+ }
+ memcpy(data, (*it)->mData, copy);
+ data = (uint8_t *)data + copy;
+ size -= copy;
+ ++it;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+NuCachedSource2::NuCachedSource2(
+ const sp<DataSource> &source,
+ const char *cacheConfig,
+ bool disconnectAtHighwatermark)
+ : mSource(source),
+ mReflector(new AHandlerReflector<NuCachedSource2>(this)),
+ mLooper(new ALooper),
+ mCache(new PageCache(kPageSize)),
+ mCacheOffset(0),
+ mFinalStatus(OK),
+ mLastAccessPos(0),
+ mFetching(true),
+ mLastFetchTimeUs(-1),
+ mNumRetriesLeft(kMaxNumRetries),
+ mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
+ mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
+ mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
+ mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
+ // We are NOT going to support disconnect-at-highwatermark indefinitely
+ // and we are not guaranteeing support for client-specified cache
+ // parameters. Both of these are temporary measures to solve a specific
+ // problem that will be solved in a better way going forward.
+
+ updateCacheParamsFromSystemProperty();
+
+ if (cacheConfig != NULL) {
+ updateCacheParamsFromString(cacheConfig);
+ }
+
+ if (mDisconnectAtHighwatermark) {
+ // Makes no sense to disconnect and do keep-alives...
+ mKeepAliveIntervalUs = 0;
+ }
+
+ mLooper->setName("NuCachedSource2");
+ mLooper->registerHandler(mReflector);
+ mLooper->start();
+
+ Mutex::Autolock autoLock(mLock);
+ (new AMessage(kWhatFetchMore, mReflector->id()))->post();
+}
+
+NuCachedSource2::~NuCachedSource2() {
+ mLooper->stop();
+ mLooper->unregisterHandler(mReflector->id());
+
+ delete mCache;
+ mCache = NULL;
+}
+
+status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
+ if (mSource->flags() & kIsHTTPBasedSource) {
+ HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
+ return source->getEstimatedBandwidthKbps(kbps);
+ }
+ return ERROR_UNSUPPORTED;
+}
+
+status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
+ if (mSource->flags() & kIsHTTPBasedSource) {
+ HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
+ return source->setBandwidthStatCollectFreq(freqMs);
+ }
+ return ERROR_UNSUPPORTED;
+}
+
+status_t NuCachedSource2::initCheck() const {
+ return mSource->initCheck();
+}
+
+status_t NuCachedSource2::getSize(off64_t *size) {
+ return mSource->getSize(size);
+}
+
+uint32_t NuCachedSource2::flags() {
+ // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
+ uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
+ return (flags | kIsCachingDataSource);
+}
+
+void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
+ switch (msg->what()) {
+ case kWhatFetchMore:
+ {
+ onFetch();
+ break;
+ }
+
+ case kWhatRead:
+ {
+ onRead(msg);
+ break;
+ }
+
+ default:
+ TRESPASS();
+ }
+}
+
+void NuCachedSource2::fetchInternal() {
+ ALOGV("fetchInternal");
+
+ bool reconnect = false;
+
+ {
+ Mutex::Autolock autoLock(mLock);
+ CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
+
+ if (mFinalStatus != OK) {
+ --mNumRetriesLeft;
+
+ reconnect = true;
+ }
+ }
+
+ if (reconnect) {
+ status_t err =
+ mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
+
+ Mutex::Autolock autoLock(mLock);
+
+ if (err == ERROR_UNSUPPORTED) {
+ mNumRetriesLeft = 0;
+ return;
+ } else if (err != OK) {
+ ALOGI("The attempt to reconnect failed, %d retries remaining",
+ mNumRetriesLeft);
+
+ return;
+ }
+ }
+
+ PageCache::Page *page = mCache->acquirePage();
+
+ ssize_t n = mSource->readAt(
+ mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
+
+ Mutex::Autolock autoLock(mLock);
+
+ if (n < 0) {
+ ALOGE("source returned error %ld, %d retries left", n, mNumRetriesLeft);
+ mFinalStatus = n;
+ mCache->releasePage(page);
+ } else if (n == 0) {
+ ALOGI("ERROR_END_OF_STREAM");
+
+ mNumRetriesLeft = 0;
+ mFinalStatus = ERROR_END_OF_STREAM;
+
+ mCache->releasePage(page);
+ } else {
+ if (mFinalStatus != OK) {
+ ALOGI("retrying a previously failed read succeeded.");
+ }
+ mNumRetriesLeft = kMaxNumRetries;
+ mFinalStatus = OK;
+
+ page->mSize = n;
+ mCache->appendPage(page);
+ }
+}
+
+void NuCachedSource2::onFetch() {
+ ALOGV("onFetch");
+
+ if (mFinalStatus != OK && mNumRetriesLeft == 0) {
+ ALOGV("EOS reached, done prefetching for now");
+ mFetching = false;
+ }
+
+ bool keepAlive =
+ !mFetching
+ && mFinalStatus == OK
+ && mKeepAliveIntervalUs > 0
+ && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
+
+ if (mFetching || keepAlive) {
+ if (keepAlive) {
+ ALOGI("Keep alive");
+ }
+
+ fetchInternal();
+
+ mLastFetchTimeUs = ALooper::GetNowUs();
+
+ if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
+ ALOGI("Cache full, done prefetching for now");
+ mFetching = false;
+
+ if (mDisconnectAtHighwatermark
+ && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
+ ALOGV("Disconnecting at high watermark");
+ static_cast<HTTPBase *>(mSource.get())->disconnect();
+ mFinalStatus = -EAGAIN;
+ }
+ }
+ } else {
+ Mutex::Autolock autoLock(mLock);
+ restartPrefetcherIfNecessary_l();
+ }
+
+ int64_t delayUs;
+ if (mFetching) {
+ if (mFinalStatus != OK && mNumRetriesLeft > 0) {
+ // We failed this time and will try again in 3 seconds.
+ delayUs = 3000000ll;
+ } else {
+ delayUs = 0;
+ }
+ } else {
+ delayUs = 100000ll;
+ }
+
+ (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs);
+}
+
+void NuCachedSource2::onRead(const sp<AMessage> &msg) {
+ ALOGV("onRead");
+
+ int64_t offset;
+ CHECK(msg->findInt64("offset", &offset));
+
+ void *data;
+ CHECK(msg->findPointer("data", &data));
+
+ size_t size;
+ CHECK(msg->findSize("size", &size));
+
+ ssize_t result = readInternal(offset, data, size);
+
+ if (result == -EAGAIN) {
+ msg->post(50000);
+ return;
+ }
+
+ Mutex::Autolock autoLock(mLock);
+
+ CHECK(mAsyncResult == NULL);
+
+ mAsyncResult = new AMessage;
+ mAsyncResult->setInt32("result", result);
+
+ mCondition.signal();
+}
+
+void NuCachedSource2::restartPrefetcherIfNecessary_l(
+ bool ignoreLowWaterThreshold, bool force) {
+ static const size_t kGrayArea = 1024 * 1024;
+
+ if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
+ return;
+ }
+
+ if (!ignoreLowWaterThreshold && !force
+ && mCacheOffset + mCache->totalSize() - mLastAccessPos
+ >= mLowwaterThresholdBytes) {
+ return;
+ }
+
+ size_t maxBytes = mLastAccessPos - mCacheOffset;
+
+ if (!force) {
+ if (maxBytes < kGrayArea) {
+ return;
+ }
+
+ maxBytes -= kGrayArea;
+ }
+
+ size_t actualBytes = mCache->releaseFromStart(maxBytes);
+ mCacheOffset += actualBytes;
+
+ ALOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
+ mFetching = true;
+}
+
+ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
+ Mutex::Autolock autoSerializer(mSerializer);
+
+ ALOGV("readAt offset %lld, size %d", offset, size);
+
+ Mutex::Autolock autoLock(mLock);
+
+ // If the request can be completely satisfied from the cache, do so.
+
+ if (offset >= mCacheOffset
+ && offset + size <= mCacheOffset + mCache->totalSize()) {
+ size_t delta = offset - mCacheOffset;
+ mCache->copy(delta, data, size);
+
+ mLastAccessPos = offset + size;
+
+ return size;
+ }
+
+ sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
+ msg->setInt64("offset", offset);
+ msg->setPointer("data", data);
+ msg->setSize("size", size);
+
+ CHECK(mAsyncResult == NULL);
+ msg->post();
+
+ while (mAsyncResult == NULL) {
+ mCondition.wait(mLock);
+ }
+
+ int32_t result;
+ CHECK(mAsyncResult->findInt32("result", &result));
+
+ mAsyncResult.clear();
+
+ if (result > 0) {
+ mLastAccessPos = offset + result;
+ }
+
+ return (ssize_t)result;
+}
+
+size_t NuCachedSource2::cachedSize() {
+ Mutex::Autolock autoLock(mLock);
+ return mCacheOffset + mCache->totalSize();
+}
+
+size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) {
+ Mutex::Autolock autoLock(mLock);
+ return approxDataRemaining_l(finalStatus);
+}
+
+size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) {
+ *finalStatus = mFinalStatus;
+
+ if (mFinalStatus != OK && mNumRetriesLeft > 0) {
+ // Pretend that everything is fine until we're out of retries.
+ *finalStatus = OK;
+ }
+
+ off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
+ if (mLastAccessPos < lastBytePosCached) {
+ return lastBytePosCached - mLastAccessPos;
+ }
+ return 0;
+}
+
+ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
+ CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
+
+ ALOGV("readInternal offset %lld size %d", offset, size);
+
+ Mutex::Autolock autoLock(mLock);
+
+ if (!mFetching) {
+ mLastAccessPos = offset;
+ restartPrefetcherIfNecessary_l(
+ false, // ignoreLowWaterThreshold
+ true); // force
+ }
+
+ if (offset < mCacheOffset
+ || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
+ static const off64_t kPadding = 256 * 1024;
+
+ // In the presence of multiple decoded streams, once of them will
+ // trigger this seek request, the other one will request data "nearby"
+ // soon, adjust the seek position so that that subsequent request
+ // does not trigger another seek.
+ off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
+
+ seekInternal_l(seekOffset);
+ }
+
+ size_t delta = offset - mCacheOffset;
+
+ if (mFinalStatus != OK && mNumRetriesLeft == 0) {
+ if (delta >= mCache->totalSize()) {
+ return mFinalStatus;
+ }
+
+ size_t avail = mCache->totalSize() - delta;
+
+ if (avail > size) {
+ avail = size;
+ }
+
+ mCache->copy(delta, data, avail);
+
+ return avail;
+ }
+
+ if (offset + size <= mCacheOffset + mCache->totalSize()) {
+ mCache->copy(delta, data, size);
+
+ return size;
+ }
+
+ ALOGV("deferring read");
+
+ return -EAGAIN;
+}
+
+status_t NuCachedSource2::seekInternal_l(off64_t offset) {
+ mLastAccessPos = offset;
+
+ if (offset >= mCacheOffset
+ && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
+ return OK;
+ }
+
+ ALOGI("new range: offset= %lld", offset);
+
+ mCacheOffset = offset;
+
+ size_t totalSize = mCache->totalSize();
+ CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
+
+ mNumRetriesLeft = kMaxNumRetries;
+ mFetching = true;
+
+ return OK;
+}
+
+void NuCachedSource2::resumeFetchingIfNecessary() {
+ Mutex::Autolock autoLock(mLock);
+
+ restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
+}
+
+sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) {
+ return mSource->DrmInitialization(mime);
+}
+
+void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
+ mSource->getDrmInfo(handle, client);
+}
+
+String8 NuCachedSource2::getUri() {
+ return mSource->getUri();
+}
+
+String8 NuCachedSource2::getMIMEType() const {
+ return mSource->getMIMEType();
+}
+
+void NuCachedSource2::updateCacheParamsFromSystemProperty() {
+ char value[PROPERTY_VALUE_MAX];
+ if (!property_get("media.stagefright.cache-params", value, NULL)) {
+ return;
+ }
+
+ updateCacheParamsFromString(value);
+}
+
+void NuCachedSource2::updateCacheParamsFromString(const char *s) {
+ ssize_t lowwaterMarkKb, highwaterMarkKb;
+ int keepAliveSecs;
+
+ if (sscanf(s, "%ld/%ld/%d",
+ &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
+ ALOGE("Failed to parse cache parameters from '%s'.", s);
+ return;
+ }
+
+ if (lowwaterMarkKb >= 0) {
+ mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
+ } else {
+ mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
+ }
+
+ if (highwaterMarkKb >= 0) {
+ mHighwaterThresholdBytes = highwaterMarkKb * 1024;
+ } else {
+ mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
+ }
+
+ if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
+ ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
+
+ mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
+ mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
+ }
+
+ if (keepAliveSecs >= 0) {
+ mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
+ } else {
+ mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
+ }
+
+ ALOGV("lowwater = %d bytes, highwater = %d bytes, keepalive = %lld us",
+ mLowwaterThresholdBytes,
+ mHighwaterThresholdBytes,
+ mKeepAliveIntervalUs);
+}
+
+// static
+void NuCachedSource2::RemoveCacheSpecificHeaders(
+ KeyedVector<String8, String8> *headers,
+ String8 *cacheConfig,
+ bool *disconnectAtHighwatermark) {
+ *cacheConfig = String8();
+ *disconnectAtHighwatermark = false;
+
+ if (headers == NULL) {
+ return;
+ }
+
+ ssize_t index;
+ if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
+ *cacheConfig = headers->valueAt(index);
+
+ headers->removeItemsAt(index);
+
+ ALOGV("Using special cache config '%s'", cacheConfig->string());
+ }
+
+ if ((index = headers->indexOfKey(
+ String8("x-disconnect-at-highwatermark"))) >= 0) {
+ *disconnectAtHighwatermark = true;
+ headers->removeItemsAt(index);
+
+ ALOGV("Client requested disconnection at highwater mark");
+ }
+}
+
+} // namespace android