diff options
Diffstat (limited to 'media/libstagefright/httplive/LiveSession.cpp')
-rw-r--r-- | media/libstagefright/httplive/LiveSession.cpp | 646 |
1 files changed, 646 insertions, 0 deletions
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp new file mode 100644 index 0000000..62567be --- /dev/null +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -0,0 +1,646 @@ +/* + * Copyright (C) 2010 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#define LOG_NDEBUG 0 +#define LOG_TAG "LiveSession" +#include <utils/Log.h> + +#include "include/LiveSession.h" + +#include "LiveDataSource.h" + +#include "include/M3UParser.h" +#include "include/NuHTTPDataSource.h" + +#include <cutils/properties.h> +#include <media/stagefright/foundation/hexdump.h> +#include <media/stagefright/foundation/ABuffer.h> +#include <media/stagefright/foundation/ADebug.h> +#include <media/stagefright/foundation/AMessage.h> +#include <media/stagefright/DataSource.h> +#include <media/stagefright/FileSource.h> +#include <media/stagefright/MediaErrors.h> + +#include <ctype.h> +#include <openssl/aes.h> + +namespace android { + +const int64_t LiveSession::kMaxPlaylistAgeUs = 15000000ll; + +LiveSession::LiveSession() + : mDataSource(new LiveDataSource), + mHTTPDataSource(new NuHTTPDataSource), + mPrevBandwidthIndex(-1), + mLastPlaylistFetchTimeUs(-1), + mSeqNumber(-1), + mSeekTimeUs(-1), + mNumRetries(0), + mDurationUs(-1), + mSeekDone(false), + mMonitorQueueGeneration(0) { +} + +LiveSession::~LiveSession() { +} + +sp<DataSource> LiveSession::getDataSource() { + return mDataSource; +} + +void LiveSession::connect(const char *url) { + sp<AMessage> msg = new AMessage(kWhatConnect, id()); + msg->setString("url", url); + msg->post(); +} + +void LiveSession::disconnect() { + (new AMessage(kWhatDisconnect, id()))->post(); +} + +void LiveSession::seekTo(int64_t timeUs) { + Mutex::Autolock autoLock(mLock); + mSeekDone = false; + + sp<AMessage> msg = new AMessage(kWhatSeek, id()); + msg->setInt64("timeUs", timeUs); + msg->post(); + + while (!mSeekDone) { + mCondition.wait(mLock); + } +} + +void LiveSession::onMessageReceived(const sp<AMessage> &msg) { + switch (msg->what()) { + case kWhatConnect: + onConnect(msg); + break; + + case kWhatDisconnect: + onDisconnect(); + break; + + case kWhatMonitorQueue: + { + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + + if (generation != mMonitorQueueGeneration) { + // Stale event + break; + } + + onMonitorQueue(); + break; + } + + case kWhatSeek: + onSeek(msg); + break; + + default: + TRESPASS(); + break; + } +} + +// static +int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { + if (a->mBandwidth < b->mBandwidth) { + return -1; + } else if (a->mBandwidth == b->mBandwidth) { + return 0; + } + + return 1; +} + +void LiveSession::onConnect(const sp<AMessage> &msg) { + AString url; + CHECK(msg->findString("url", &url)); + + LOGI("onConnect '%s'", url.c_str()); + + mMasterURL = url; + + sp<M3UParser> playlist = fetchPlaylist(url.c_str()); + CHECK(playlist != NULL); + + if (playlist->isVariantPlaylist()) { + for (size_t i = 0; i < playlist->size(); ++i) { + BandwidthItem item; + + sp<AMessage> meta; + playlist->itemAt(i, &item.mURI, &meta); + + unsigned long bandwidth; + CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); + + mBandwidthItems.push(item); + } + + CHECK_GT(mBandwidthItems.size(), 0u); + + mBandwidthItems.sort(SortByBandwidth); + + if (mBandwidthItems.size() > 1) { + // XXX Remove the lowest bitrate stream for now... + mBandwidthItems.removeAt(0); + } + } + + postMonitorQueue(); +} + +void LiveSession::onDisconnect() { + LOGI("onDisconnect"); + + mDataSource->queueEOS(ERROR_END_OF_STREAM); +} + +status_t LiveSession::fetchFile(const char *url, sp<ABuffer> *out) { + *out = NULL; + + sp<DataSource> source; + + if (!strncasecmp(url, "file://", 7)) { + source = new FileSource(url + 7); + } else { + CHECK(!strncasecmp(url, "http://", 7)); + + status_t err = mHTTPDataSource->connect(url); + + if (err != OK) { + return err; + } + + source = mHTTPDataSource; + } + + off64_t size; + status_t err = source->getSize(&size); + + if (err != OK) { + size = 65536; + } + + sp<ABuffer> buffer = new ABuffer(size); + buffer->setRange(0, 0); + + for (;;) { + size_t bufferRemaining = buffer->capacity() - buffer->size(); + + if (bufferRemaining == 0) { + bufferRemaining = 32768; + + LOGV("increasing download buffer to %d bytes", + buffer->size() + bufferRemaining); + + sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); + memcpy(copy->data(), buffer->data(), buffer->size()); + copy->setRange(0, buffer->size()); + + buffer = copy; + } + + ssize_t n = source->readAt( + buffer->size(), buffer->data() + buffer->size(), + bufferRemaining); + + if (n < 0) { + return err; + } + + if (n == 0) { + break; + } + + buffer->setRange(0, buffer->size() + (size_t)n); + } + + *out = buffer; + + return OK; +} + +sp<M3UParser> LiveSession::fetchPlaylist(const char *url) { + sp<ABuffer> buffer; + status_t err = fetchFile(url, &buffer); + + if (err != OK) { + return NULL; + } + + sp<M3UParser> playlist = + new M3UParser(url, buffer->data(), buffer->size()); + + if (playlist->initCheck() != OK) { + return NULL; + } + + return playlist; +} + +static double uniformRand() { + return (double)rand() / RAND_MAX; +} + +size_t LiveSession::getBandwidthIndex() { + if (mBandwidthItems.size() == 0) { + return 0; + } + +#if 1 + int32_t bandwidthBps; + if (mHTTPDataSource != NULL + && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { + LOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); + } else { + LOGV("no bandwidth estimate."); + return 0; // Pick the lowest bandwidth stream by default. + } + + char value[PROPERTY_VALUE_MAX]; + if (property_get("media.httplive.max-bw", value, NULL)) { + char *end; + long maxBw = strtoul(value, &end, 10); + if (end > value && *end == '\0') { + if (maxBw > 0 && bandwidthBps > maxBw) { + LOGV("bandwidth capped to %ld bps", maxBw); + bandwidthBps = maxBw; + } + } + } + + // Consider only 80% of the available bandwidth usable. + bandwidthBps = (bandwidthBps * 8) / 10; + + // Pick the highest bandwidth stream below or equal to estimated bandwidth. + + size_t index = mBandwidthItems.size() - 1; + while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth + > (size_t)bandwidthBps) { + --index; + } +#elif 0 + // Change bandwidth at random() + size_t index = uniformRand() * mBandwidthItems.size(); +#elif 0 + // There's a 50% chance to stay on the current bandwidth and + // a 50% chance to switch to the next higher bandwidth (wrapping around + // to lowest) + const size_t kMinIndex = 0; + + size_t index; + if (mPrevBandwidthIndex < 0) { + index = kMinIndex; + } else if (uniformRand() < 0.5) { + index = (size_t)mPrevBandwidthIndex; + } else { + index = mPrevBandwidthIndex + 1; + if (index == mBandwidthItems.size()) { + index = kMinIndex; + } + } +#elif 0 + // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec + + size_t index = mBandwidthItems.size() - 1; + while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { + --index; + } +#else + size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream +#endif + + return index; +} + +void LiveSession::onDownloadNext() { + size_t bandwidthIndex = getBandwidthIndex(); + + int64_t nowUs = ALooper::GetNowUs(); + + if (mLastPlaylistFetchTimeUs < 0 + || (ssize_t)bandwidthIndex != mPrevBandwidthIndex + || (!mPlaylist->isComplete() + && mLastPlaylistFetchTimeUs + kMaxPlaylistAgeUs <= nowUs)) { + AString url; + if (mBandwidthItems.size() > 0) { + url = mBandwidthItems.editItemAt(bandwidthIndex).mURI; + } else { + url = mMasterURL; + } + + bool firstTime = (mPlaylist == NULL); + + mPlaylist = fetchPlaylist(url.c_str()); + CHECK(mPlaylist != NULL); + + if (firstTime) { + Mutex::Autolock autoLock(mLock); + + int32_t targetDuration; + if (!mPlaylist->isComplete() + || !mPlaylist->meta()->findInt32( + "target-duration", &targetDuration)) { + mDurationUs = -1; + } else { + mDurationUs = 1000000ll * targetDuration * mPlaylist->size(); + } + } + + mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); + } + + int32_t firstSeqNumberInPlaylist; + if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( + "media-sequence", &firstSeqNumberInPlaylist)) { + firstSeqNumberInPlaylist = 0; + } + + bool explicitDiscontinuity = false; + + if (mSeekTimeUs >= 0) { + int32_t targetDuration; + if (mPlaylist->isComplete() && + mPlaylist->meta()->findInt32( + "target-duration", &targetDuration)) { + int64_t seekTimeSecs = (mSeekTimeUs + 500000ll) / 1000000ll; + int64_t index = seekTimeSecs / targetDuration; + + if (index >= 0 && index < mPlaylist->size()) { + mSeqNumber = firstSeqNumberInPlaylist + index; + mDataSource->reset(); + + explicitDiscontinuity = true; + } + } + + mSeekTimeUs = -1; + + Mutex::Autolock autoLock(mLock); + mSeekDone = true; + mCondition.broadcast(); + } + + if (mSeqNumber < 0) { + if (mPlaylist->isComplete()) { + mSeqNumber = firstSeqNumberInPlaylist; + } else { + mSeqNumber = firstSeqNumberInPlaylist + mPlaylist->size() / 2; + } + } + + int32_t lastSeqNumberInPlaylist = + firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; + + if (mSeqNumber < firstSeqNumberInPlaylist + || mSeqNumber > lastSeqNumberInPlaylist) { + if (!mPlaylist->isComplete() + && mSeqNumber > lastSeqNumberInPlaylist + && mNumRetries < kMaxNumRetries) { + ++mNumRetries; + + mLastPlaylistFetchTimeUs = -1; + postMonitorQueue(1000000ll); + return; + } + + LOGE("Cannot find sequence number %d in playlist " + "(contains %d - %d)", + mSeqNumber, firstSeqNumberInPlaylist, + firstSeqNumberInPlaylist + mPlaylist->size() - 1); + + mDataSource->queueEOS(ERROR_END_OF_STREAM); + return; + } + + mNumRetries = 0; + + AString uri; + sp<AMessage> itemMeta; + CHECK(mPlaylist->itemAt( + mSeqNumber - firstSeqNumberInPlaylist, + &uri, + &itemMeta)); + + int32_t val; + if (itemMeta->findInt32("discontinuity", &val) && val != 0) { + explicitDiscontinuity = true; + } + + sp<ABuffer> buffer; + status_t err = fetchFile(uri.c_str(), &buffer); + CHECK_EQ(err, (status_t)OK); + + CHECK_EQ((status_t)OK, + decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer)); + + if (buffer->size() == 0 || buffer->data()[0] != 0x47) { + // Not a transport stream??? + + LOGE("This doesn't look like a transport stream..."); + + mDataSource->queueEOS(ERROR_UNSUPPORTED); + return; + } + + if (explicitDiscontinuity + || (mPrevBandwidthIndex >= 0 + && (size_t)mPrevBandwidthIndex != bandwidthIndex)) { + // Signal discontinuity. + + sp<ABuffer> tmp = new ABuffer(188); + memset(tmp->data(), 0, tmp->size()); + + mDataSource->queueBuffer(tmp); + } + + mDataSource->queueBuffer(buffer); + + mPrevBandwidthIndex = bandwidthIndex; + ++mSeqNumber; + + postMonitorQueue(); +} + +void LiveSession::onMonitorQueue() { + if (mSeekTimeUs >= 0 + || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) { + onDownloadNext(); + } else { + postMonitorQueue(1000000ll); + } +} + +status_t LiveSession::decryptBuffer( + size_t playlistIndex, const sp<ABuffer> &buffer) { + sp<AMessage> itemMeta; + bool found = false; + AString method; + + for (ssize_t i = playlistIndex; i >= 0; --i) { + AString uri; + CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); + + if (itemMeta->findString("cipher-method", &method)) { + found = true; + break; + } + } + + if (!found) { + method = "NONE"; + } + + if (method == "NONE") { + return OK; + } else if (!(method == "AES-128")) { + LOGE("Unsupported cipher method '%s'", method.c_str()); + return ERROR_UNSUPPORTED; + } + + AString keyURI; + if (!itemMeta->findString("cipher-uri", &keyURI)) { + LOGE("Missing key uri"); + return ERROR_MALFORMED; + } + + ssize_t index = mAESKeyForURI.indexOfKey(keyURI); + + sp<ABuffer> key; + if (index >= 0) { + key = mAESKeyForURI.valueAt(index); + } else { + key = new ABuffer(16); + + sp<NuHTTPDataSource> keySource = new NuHTTPDataSource; + status_t err = keySource->connect(keyURI.c_str()); + + if (err == OK) { + size_t offset = 0; + while (offset < 16) { + ssize_t n = keySource->readAt( + offset, key->data() + offset, 16 - offset); + if (n <= 0) { + err = ERROR_IO; + break; + } + + offset += n; + } + } + + if (err != OK) { + LOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); + return ERROR_IO; + } + + mAESKeyForURI.add(keyURI, key); + } + + AES_KEY aes_key; + if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { + LOGE("failed to set AES decryption key."); + return UNKNOWN_ERROR; + } + + unsigned char aes_ivec[16]; + + AString iv; + if (itemMeta->findString("cipher-iv", &iv)) { + if ((!iv.startsWith("0x") && !iv.startsWith("0X")) + || iv.size() != 16 * 2 + 2) { + LOGE("malformed cipher IV '%s'.", iv.c_str()); + return ERROR_MALFORMED; + } + + memset(aes_ivec, 0, sizeof(aes_ivec)); + for (size_t i = 0; i < 16; ++i) { + char c1 = tolower(iv.c_str()[2 + 2 * i]); + char c2 = tolower(iv.c_str()[3 + 2 * i]); + if (!isxdigit(c1) || !isxdigit(c2)) { + LOGE("malformed cipher IV '%s'.", iv.c_str()); + return ERROR_MALFORMED; + } + uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; + uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; + + aes_ivec[i] = nibble1 << 4 | nibble2; + } + } else { + memset(aes_ivec, 0, sizeof(aes_ivec)); + aes_ivec[15] = mSeqNumber & 0xff; + aes_ivec[14] = (mSeqNumber >> 8) & 0xff; + aes_ivec[13] = (mSeqNumber >> 16) & 0xff; + aes_ivec[12] = (mSeqNumber >> 24) & 0xff; + } + + AES_cbc_encrypt( + buffer->data(), buffer->data(), buffer->size(), + &aes_key, aes_ivec, AES_DECRYPT); + + // hexdump(buffer->data(), buffer->size()); + + size_t n = buffer->size(); + CHECK_GT(n, 0u); + + size_t pad = buffer->data()[n - 1]; + + CHECK_GT(pad, 0u); + CHECK_LE(pad, 16u); + CHECK_GE((size_t)n, pad); + for (size_t i = 0; i < pad; ++i) { + CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad); + } + + n -= pad; + + buffer->setRange(buffer->offset(), n); + + return OK; +} + +void LiveSession::postMonitorQueue(int64_t delayUs) { + sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); + msg->setInt32("generation", ++mMonitorQueueGeneration); + msg->post(delayUs); +} + +void LiveSession::onSeek(const sp<AMessage> &msg) { + int64_t timeUs; + CHECK(msg->findInt64("timeUs", &timeUs)); + + mSeekTimeUs = timeUs; + postMonitorQueue(); +} + +status_t LiveSession::getDuration(int64_t *durationUs) { + Mutex::Autolock autoLock(mLock); + *durationUs = mDurationUs; + + return OK; +} + +bool LiveSession::isSeekable() { + int64_t durationUs; + return getDuration(&durationUs) == OK && durationUs >= 0; +} + +} // namespace android + |