From 43ca783effd99bba0e6e2dd6fe177a8888578ef8 Mon Sep 17 00:00:00 2001 From: Robert Shih Date: Thu, 27 Feb 2014 12:33:24 -0800 Subject: httplive: block-by-block fetch, decrypt, and parse ts files. Bug: 12060952 Change-Id: I695345081fe23961b9d0ef6db264885f914703ec --- media/libstagefright/httplive/LiveSession.cpp | 10 +- media/libstagefright/httplive/LiveSession.h | 2 +- media/libstagefright/httplive/PlaylistFetcher.cpp | 451 +++++++++++++--------- media/libstagefright/httplive/PlaylistFetcher.h | 5 + media/libstagefright/mpeg2ts/ATSParser.h | 5 +- 5 files changed, 285 insertions(+), 188 deletions(-) (limited to 'media/libstagefright') diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index 3e11759..e53e35a 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -626,7 +626,7 @@ sp LiveSession::addFetcher(const char *uri) { * - block_size == 0 means entire range * */ -status_t LiveSession::fetchFile( +ssize_t LiveSession::fetchFile( const char *url, sp *out, int64_t range_offset, int64_t range_length, uint32_t block_size, /* download block size */ @@ -677,6 +677,7 @@ status_t LiveSession::fetchFile( buffer->setRange(0, 0); } + ssize_t bytesRead = 0; // adjust range_length if only reading partial block if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) { range_length = buffer->size() + block_size; @@ -724,6 +725,7 @@ status_t LiveSession::fetchFile( } buffer->setRange(0, buffer->size() + (size_t)n); + bytesRead += n; } *out = buffer; @@ -734,7 +736,7 @@ status_t LiveSession::fetchFile( } } - return OK; + return bytesRead; } sp LiveSession::fetchPlaylist( @@ -745,9 +747,9 @@ sp LiveSession::fetchPlaylist( sp buffer; String8 actualUrl; - status_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); + ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); - if (err != OK) { + if (err <= 0) { return NULL; } diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h index 680792d..3f8fee5 100644 --- a/media/libstagefright/httplive/LiveSession.h +++ b/media/libstagefright/httplive/LiveSession.h @@ -202,7 +202,7 @@ private: // // For reused HTTP sources, the caller must download a file sequentially without // any overlaps or gaps to prevent reconnection. - status_t fetchFile( + ssize_t fetchFile( const char *url, sp *out, /* request/open a file starting at range_offset for range_length bytes */ int64_t range_offset = 0, int64_t range_length = -1, diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp index 012a68b..ada856d 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.cpp +++ b/media/libstagefright/httplive/PlaylistFetcher.cpp @@ -48,6 +48,7 @@ namespace android { // static const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; +const int32_t PlaylistFetcher::kDownloadBlockSize = 192; const int32_t PlaylistFetcher::kNumSkipFrames = 10; PlaylistFetcher::PlaylistFetcher( @@ -216,9 +217,9 @@ status_t PlaylistFetcher::decryptBuffer( if (index >= 0) { key = mAESKeyForURI.valueAt(index); } else { - status_t err = mSession->fetchFile(keyURI.c_str(), &key); + ssize_t err = mSession->fetchFile(keyURI.c_str(), &key); - if (err != OK) { + if (err < 0) { ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); return ERROR_IO; } else if (key->size() != 16) { @@ -704,6 +705,11 @@ status_t PlaylistFetcher::refreshPlaylist() { return OK; } +// static +bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp& buffer) { + return buffer->size() > 0 && buffer->data()[0] == 0x47; +} + void PlaylistFetcher::onDownloadNext() { if (refreshPlaylist() != OK) { return; @@ -823,66 +829,161 @@ void PlaylistFetcher::onDownloadNext() { ALOGV("fetching '%s'", uri.c_str()); - sp buffer; - status_t err = mSession->fetchFile( - uri.c_str(), &buffer, range_offset, range_length); - - if (err != OK) { - ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); - notifyError(err); - return; + sp source; + sp buffer, tsBuffer; + // decrypt a junk buffer to prefetch key; since a session uses only one http connection, + // this avoids interleaved connections to the key and segment file. + { + sp junk = new ABuffer(16); + junk->setRange(0, 16); + status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, + true /* first */); + if (err != OK) { + notifyError(err); + return; + } } - CHECK(buffer != NULL); + // block-wise download + ssize_t bytesRead; + do { + bytesRead = mSession->fetchFile( + uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source); - err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer); - if (err == OK) { - err = checkDecryptPadding(buffer); - } + if (bytesRead < 0) { + status_t err = bytesRead; + ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); + notifyError(err); + return; + } - if (err != OK) { - ALOGE("decryptBuffer failed w/ error %d", err); + CHECK(buffer != NULL); - notifyError(err); - return; - } + size_t size = buffer->size(); + // Set decryption range. + buffer->setRange(size - bytesRead, bytesRead); + status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, + buffer->offset() == 0 /* first */); + // Unset decryption range. + buffer->setRange(0, size); - if (mStartup || seekDiscontinuity || explicitDiscontinuity) { - // Signal discontinuity. + if (err != OK) { + ALOGE("decryptBuffer failed w/ error %d", err); - if (mPlaylist->isComplete() || mPlaylist->isEvent()) { - // If this was a live event this made no sense since - // we don't have access to all the segment before the current - // one. - mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); + notifyError(err); + return; } - if (seekDiscontinuity || explicitDiscontinuity) { - ALOGI("queueing discontinuity (seek=%d, explicit=%d)", - seekDiscontinuity, explicitDiscontinuity); + if (mStartup || seekDiscontinuity || explicitDiscontinuity) { + // Signal discontinuity. + + if (mPlaylist->isComplete() || mPlaylist->isEvent()) { + // If this was a live event this made no sense since + // we don't have access to all the segment before the current + // one. + mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); + } + + if (seekDiscontinuity || explicitDiscontinuity) { + ALOGI("queueing discontinuity (seek=%d, explicit=%d)", + seekDiscontinuity, explicitDiscontinuity); - queueDiscontinuity( - explicitDiscontinuity - ? ATSParser::DISCONTINUITY_FORMATCHANGE - : ATSParser::DISCONTINUITY_SEEK, - NULL /* extra */); + queueDiscontinuity( + explicitDiscontinuity + ? ATSParser::DISCONTINUITY_FORMATCHANGE + : ATSParser::DISCONTINUITY_SEEK, + NULL /* extra */); + } } - } - err = extractAndQueueAccessUnits(buffer, itemMeta); + err = OK; + if (bufferStartsWithTsSyncByte(buffer)) { + // Incremental extraction is only supported for MPEG2 transport streams. + if (tsBuffer == NULL) { + tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); + tsBuffer->setRange(0, 0); + } else if (tsBuffer->capacity() != buffer->capacity()) { + size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); + tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); + tsBuffer->setRange(tsOff, tsSize); + } + tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); + + err = extractAndQueueAccessUnitsFromTs(tsBuffer); + } + + if (err == -EAGAIN) { + // bad starting sequence number hint + postMonitorQueue(); + return; + } + + if (err == ERROR_OUT_OF_RANGE) { + // reached stopping point + stopAsync(/* selfTriggered = */ true); + return; + } + + if (err != OK) { + notifyError(err); + return; + } + + mStartup = false; + } while (bytesRead != 0); + + if (bufferStartsWithTsSyncByte(buffer)) { + // If we still don't see a stream after fetching a full ts segment mark it as + // nonexistent. + const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES; + ATSParser::SourceType srcTypes[kNumTypes] = + { ATSParser::VIDEO, ATSParser::AUDIO }; + LiveSession::StreamType streamTypes[kNumTypes] = + { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; + + for (size_t i = 0; i < kNumTypes; i++) { + ATSParser::SourceType srcType = srcTypes[i]; + LiveSession::StreamType streamType = streamTypes[i]; + + sp source = + static_cast( + mTSParser->getSource(srcType).get()); + + if (source == NULL) { + ALOGW("MPEG2 Transport stream does not contain %s data.", + srcType == ATSParser::VIDEO ? "video" : "audio"); + + mStreamTypeMask &= ~streamType; + mPacketSources.removeItem(streamType); + } + } - if (err == -EAGAIN) { - // bad starting sequence number hint - postMonitorQueue(); - return; } - if (err == ERROR_OUT_OF_RANGE) { - // reached stopping point - stopAsync(/* selfTriggered = */ true); + if (checkDecryptPadding(buffer) != OK) { + ALOGE("Incorrect padding bytes after decryption."); + notifyError(ERROR_MALFORMED); return; } + status_t err = OK; + if (tsBuffer != NULL) { + AString method; + CHECK(buffer->meta()->findString("cipher-method", &method)); + if ((tsBuffer->size() > 0 && method == "NONE") + || tsBuffer->size() > 16) { + ALOGE("MPEG2 transport stream is not an even multiple of 188 " + "bytes in length."); + notifyError(ERROR_MALFORMED); + return; + } + } + + // bulk extract non-ts files + if (tsBuffer == NULL) { + err = extractAndQueueAccessUnits(buffer, itemMeta); + } + if (err != OK) { notifyError(err); return; @@ -891,8 +992,6 @@ void PlaylistFetcher::onDownloadNext() { ++mSeqNumber; postMonitorQueue(); - - mStartup = false; } int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { @@ -927,173 +1026,163 @@ int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { return firstSeqNumberInPlaylist + index; } -status_t PlaylistFetcher::extractAndQueueAccessUnits( - const sp &buffer, const sp &itemMeta) { - if (buffer->size() > 0 && buffer->data()[0] == 0x47) { - // Let's assume this is an MPEG2 transport stream. - - if ((buffer->size() % 188) != 0) { - ALOGE("MPEG2 transport stream is not an even multiple of 188 " - "bytes in length."); - return ERROR_MALFORMED; - } - - if (mTSParser == NULL) { - // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. - mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); - } - - if (mNextPTSTimeUs >= 0ll) { - sp extra = new AMessage; - // Since we are using absolute timestamps, signal an offset of 0 to prevent - // ATSParser from skewing the timestamps of access units. - extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); +status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &buffer) { + if (mTSParser == NULL) { + // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. + mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); + } - mTSParser->signalDiscontinuity( - ATSParser::DISCONTINUITY_SEEK, extra); + if (mNextPTSTimeUs >= 0ll) { + sp extra = new AMessage; + // Since we are using absolute timestamps, signal an offset of 0 to prevent + // ATSParser from skewing the timestamps of access units. + extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); - mNextPTSTimeUs = -1ll; - } + mTSParser->signalDiscontinuity( + ATSParser::DISCONTINUITY_SEEK, extra); - size_t offset = 0; - while (offset < buffer->size()) { - status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); + mNextPTSTimeUs = -1ll; + } - if (err != OK) { - return err; - } + size_t offset = 0; + while (offset + 188 <= buffer->size()) { + status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); - offset += 188; + if (err != OK) { + return err; } - status_t err = OK; - for (size_t i = mPacketSources.size(); i-- > 0;) { - sp packetSource = mPacketSources.valueAt(i); - - const char *key; - ATSParser::SourceType type; - const LiveSession::StreamType stream = mPacketSources.keyAt(i); - switch (stream) { + offset += 188; + } + // setRange to indicate consumed bytes. + buffer->setRange(buffer->offset() + offset, buffer->size() - offset); - case LiveSession::STREAMTYPE_VIDEO: - type = ATSParser::VIDEO; - key = "timeUsVideo"; - break; + status_t err = OK; + for (size_t i = mPacketSources.size(); i-- > 0;) { + sp packetSource = mPacketSources.valueAt(i); - case LiveSession::STREAMTYPE_AUDIO: - type = ATSParser::AUDIO; - key = "timeUsAudio"; - break; + const char *key; + ATSParser::SourceType type; + const LiveSession::StreamType stream = mPacketSources.keyAt(i); + switch (stream) { + case LiveSession::STREAMTYPE_VIDEO: + type = ATSParser::VIDEO; + key = "timeUsVideo"; + break; - case LiveSession::STREAMTYPE_SUBTITLES: - { - ALOGE("MPEG2 Transport streams do not contain subtitles."); - return ERROR_MALFORMED; - break; - } + case LiveSession::STREAMTYPE_AUDIO: + type = ATSParser::AUDIO; + key = "timeUsAudio"; + break; - default: - TRESPASS(); + case LiveSession::STREAMTYPE_SUBTITLES: + { + ALOGE("MPEG2 Transport streams do not contain subtitles."); + return ERROR_MALFORMED; + break; } - sp source = - static_cast( - mTSParser->getSource(type).get()); + default: + TRESPASS(); + } - if (source == NULL) { - ALOGW("MPEG2 Transport stream does not contain %s data.", - type == ATSParser::VIDEO ? "video" : "audio"); + sp source = + static_cast( + mTSParser->getSource(type).get()); - mStreamTypeMask &= ~mPacketSources.keyAt(i); - mPacketSources.removeItemsAt(i); - continue; - } + if (source == NULL) { + continue; + } - int64_t timeUs; - sp accessUnit; - status_t finalResult; - while (source->hasBufferAvailable(&finalResult) - && source->dequeueAccessUnit(&accessUnit) == OK) { - - CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); - if (mMinStartTimeUs > 0) { - if (timeUs < mMinStartTimeUs) { - // TODO untested path - // try a later ts - int32_t targetDuration; - mPlaylist->meta()->findInt32("target-duration", &targetDuration); - int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration; - if (incr == 0) { - // increment mSeqNumber by at least one - incr = 1; - } - mSeqNumber += incr; - err = -EAGAIN; - break; - } else { - int64_t startTimeUs; - if (mStartTimeUsNotify != NULL - && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) { - mStartTimeUsNotify->setInt64(key, timeUs); - - uint32_t streamMask = 0; - mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); - streamMask |= mPacketSources.keyAt(i); - mStartTimeUsNotify->setInt32("streamMask", streamMask); - - if (streamMask == mStreamTypeMask) { - mStartTimeUsNotify->post(); - mStartTimeUsNotify.clear(); - } + int64_t timeUs; + sp accessUnit; + status_t finalResult; + while (source->hasBufferAvailable(&finalResult) + && source->dequeueAccessUnit(&accessUnit) == OK) { + + CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); + if (mMinStartTimeUs > 0) { + if (timeUs < mMinStartTimeUs) { + // TODO untested path + // try a later ts + int32_t targetDuration; + mPlaylist->meta()->findInt32("target-duration", &targetDuration); + int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration; + if (incr == 0) { + // increment mSeqNumber by at least one + incr = 1; + } + mSeqNumber += incr; + err = -EAGAIN; + break; + } else { + int64_t startTimeUs; + if (mStartTimeUsNotify != NULL + && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) { + mStartTimeUsNotify->setInt64(key, timeUs); + + uint32_t streamMask = 0; + mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); + streamMask |= mPacketSources.keyAt(i); + mStartTimeUsNotify->setInt32("streamMask", streamMask); + + if (streamMask == mStreamTypeMask) { + mStartTimeUsNotify->post(); + mStartTimeUsNotify.clear(); } } } + } - if (mStopParams != NULL) { - // Queue discontinuity in original stream. - int64_t stopTimeUs; - if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) { - packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); - mStreamTypeMask &= ~stream; - mPacketSources.removeItemsAt(i); - break; - } + if (mStopParams != NULL) { + // Queue discontinuity in original stream. + int64_t stopTimeUs; + if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) { + packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); + mStreamTypeMask &= ~stream; + mPacketSources.removeItemsAt(i); + break; } + } - // Note that we do NOT dequeue any discontinuities except for format change. - - // for simplicity, store a reference to the format in each unit - sp format = source->getFormat(); - if (format != NULL) { - accessUnit->meta()->setObject("format", format); - } + // Note that we do NOT dequeue any discontinuities except for format change. - // Stash the sequence number so we can hint future fetchers where to start at. - accessUnit->meta()->setInt32("seq", mSeqNumber); - packetSource->queueAccessUnit(accessUnit); + // for simplicity, store a reference to the format in each unit + sp format = source->getFormat(); + if (format != NULL) { + accessUnit->meta()->setObject("format", format); } - if (err != OK) { - break; - } + // Stash the sequence number so we can hint future playlist where to start at. + accessUnit->meta()->setInt32("seq", mSeqNumber); + packetSource->queueAccessUnit(accessUnit); } if (err != OK) { - for (size_t i = mPacketSources.size(); i-- > 0;) { - sp packetSource = mPacketSources.valueAt(i); - packetSource->clear(); - } - return err; + break; } + } - if (!mStreamTypeMask) { - // Signal gap is filled between original and new stream. - ALOGV("ERROR OUT OF RANGE"); - return ERROR_OUT_OF_RANGE; + if (err != OK) { + for (size_t i = mPacketSources.size(); i-- > 0;) { + sp packetSource = mPacketSources.valueAt(i); + packetSource->clear(); } + return err; + } - return OK; - } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) { + if (!mStreamTypeMask) { + // Signal gap is filled between original and new stream. + ALOGV("ERROR OUT OF RANGE"); + return ERROR_OUT_OF_RANGE; + } + + return OK; +} + +status_t PlaylistFetcher::extractAndQueueAccessUnits( + const sp &buffer, const sp &itemMeta) { + if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) { if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { ALOGE("This stream only contains subtitles."); return ERROR_MALFORMED; diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h index 8404b8d..7e21523 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.h +++ b/media/libstagefright/httplive/PlaylistFetcher.h @@ -87,8 +87,11 @@ private: static const int64_t kMinBufferedDurationUs; static const int64_t kMaxMonitorDelayUs; + static const int32_t kDownloadBlockSize; static const int32_t kNumSkipFrames; + static bool bufferStartsWithTsSyncByte(const sp& buffer); + // notifications to mSession sp mNotify; sp mStartTimeUsNotify; @@ -169,6 +172,8 @@ private: // Resume a fetcher to continue until the stopping point stored in msg. status_t onResumeUntil(const sp &msg); + status_t extractAndQueueAccessUnitsFromTs(const sp &buffer); + status_t extractAndQueueAccessUnits( const sp &buffer, const sp &itemMeta); diff --git a/media/libstagefright/mpeg2ts/ATSParser.h b/media/libstagefright/mpeg2ts/ATSParser.h index a10edc9..8a80069 100644 --- a/media/libstagefright/mpeg2ts/ATSParser.h +++ b/media/libstagefright/mpeg2ts/ATSParser.h @@ -71,8 +71,9 @@ struct ATSParser : public RefBase { void signalEOS(status_t finalResult); enum SourceType { - VIDEO, - AUDIO + VIDEO = 0, + AUDIO = 1, + NUM_SOURCE_TYPES = 2 }; sp getSource(SourceType type); -- cgit v1.1