/* * Copyright (C) 2012 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "PlaylistFetcher" #include #include #include "PlaylistFetcher.h" #include "HTTPDownloader.h" #include "LiveSession.h" #include "M3UParser.h" #include "include/avc_utils.h" #include "include/ID3.h" #include "mpeg2ts/AnotherPacketSource.h" #include #include #include #include #include #include #include #include #include #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__) #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \ LiveSession::getNameForStream(stream), ##__VA_ARGS__) namespace android { // static const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll; const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; // LCM of 188 (size of a TS packet) & 1k works well const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024; struct PlaylistFetcher::DownloadState : public RefBase { DownloadState(); void resetState(); bool hasSavedState() const; void restoreState( AString &uri, sp &itemMeta, sp &buffer, sp &tsBuffer, int32_t &firstSeqNumberInPlaylist, int32_t &lastSeqNumberInPlaylist); void saveState( AString &uri, sp &itemMeta, sp &buffer, sp &tsBuffer, int32_t &firstSeqNumberInPlaylist, int32_t &lastSeqNumberInPlaylist); private: bool mHasSavedState; AString mUri; sp mItemMeta; sp mBuffer; sp mTsBuffer; int32_t mFirstSeqNumberInPlaylist; int32_t mLastSeqNumberInPlaylist; }; PlaylistFetcher::DownloadState::DownloadState() { resetState(); } bool PlaylistFetcher::DownloadState::hasSavedState() const { return mHasSavedState; } void PlaylistFetcher::DownloadState::resetState() { mHasSavedState = false; mUri.clear(); mItemMeta = NULL; mBuffer = NULL; mTsBuffer = NULL; mFirstSeqNumberInPlaylist = 0; mLastSeqNumberInPlaylist = 0; } void PlaylistFetcher::DownloadState::restoreState( AString &uri, sp &itemMeta, sp &buffer, sp &tsBuffer, int32_t &firstSeqNumberInPlaylist, int32_t &lastSeqNumberInPlaylist) { if (!mHasSavedState) { return; } uri = mUri; itemMeta = mItemMeta; buffer = mBuffer; tsBuffer = mTsBuffer; firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist; lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist; resetState(); } void PlaylistFetcher::DownloadState::saveState( AString &uri, sp &itemMeta, sp &buffer, sp &tsBuffer, int32_t &firstSeqNumberInPlaylist, int32_t &lastSeqNumberInPlaylist) { mHasSavedState = true; mUri = uri; mItemMeta = itemMeta; mBuffer = buffer; mTsBuffer = tsBuffer; mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist; mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist; } PlaylistFetcher::PlaylistFetcher( const sp ¬ify, const sp &session, const char *uri, int32_t id, int32_t subtitleGeneration) : mNotify(notify), mSession(session), mURI(uri), mFetcherID(id), mStreamTypeMask(0), mStartTimeUs(-1ll), mSegmentStartTimeUs(-1ll), mDiscontinuitySeq(-1ll), mStartTimeUsRelative(false), mLastPlaylistFetchTimeUs(-1ll), mPlaylistTimeUs(-1ll), mSeqNumber(-1), mNumRetries(0), mStartup(true), mIDRFound(false), mSeekMode(LiveSession::kSeekModeExactPosition), mTimeChangeSignaled(false), mNextPTSTimeUs(-1ll), mMonitorQueueGeneration(0), mSubtitleGeneration(subtitleGeneration), mLastDiscontinuitySeq(-1ll), mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), mFirstPTSValid(false), mFirstTimeUs(-1ll), mVideoBuffer(new AnotherPacketSource(NULL)), mThresholdRatio(-1.0f), mDownloadState(new DownloadState()), mHasMetadata(false) { memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); mHTTPDownloader = mSession->getHTTPDownloader(); } PlaylistFetcher::~PlaylistFetcher() { } int32_t PlaylistFetcher::getFetcherID() const { return mFetcherID; } int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { CHECK(mPlaylist != NULL); int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; mPlaylist->getSeqNumberRange( &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); CHECK_GE(seqNumber, firstSeqNumberInPlaylist); CHECK_LE(seqNumber, lastSeqNumberInPlaylist); int64_t segmentStartUs = 0ll; for (int32_t index = 0; index < seqNumber - firstSeqNumberInPlaylist; ++index) { sp itemMeta; CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); int64_t itemDurationUs; CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); segmentStartUs += itemDurationUs; } return segmentStartUs; } int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const { CHECK(mPlaylist != NULL); int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; mPlaylist->getSeqNumberRange( &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); CHECK_GE(seqNumber, firstSeqNumberInPlaylist); CHECK_LE(seqNumber, lastSeqNumberInPlaylist); int32_t index = seqNumber - firstSeqNumberInPlaylist; sp itemMeta; CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); int64_t itemDurationUs; CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); return itemDurationUs; } int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { int64_t nowUs = ALooper::GetNowUs(); if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); return 0ll; } if (mPlaylist->isComplete()) { return (~0llu >> 1); } int64_t targetDurationUs = mPlaylist->getTargetDuration(); int64_t minPlaylistAgeUs; switch (mRefreshState) { case INITIAL_MINIMUM_RELOAD_DELAY: { size_t n = mPlaylist->size(); if (n > 0) { sp itemMeta; CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); int64_t itemDurationUs; CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); minPlaylistAgeUs = itemDurationUs; break; } // fall through } case FIRST_UNCHANGED_RELOAD_ATTEMPT: { minPlaylistAgeUs = targetDurationUs / 2; break; } case SECOND_UNCHANGED_RELOAD_ATTEMPT: { minPlaylistAgeUs = (targetDurationUs * 3) / 2; break; } case THIRD_UNCHANGED_RELOAD_ATTEMPT: { minPlaylistAgeUs = targetDurationUs * 3; break; } default: TRESPASS(); break; } int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; return delayUs > 0ll ? delayUs : 0ll; } status_t PlaylistFetcher::decryptBuffer( size_t playlistIndex, const sp &buffer, bool first) { sp itemMeta; bool found = false; AString method; for (ssize_t i = playlistIndex; i >= 0; --i) { AString uri; CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); if (itemMeta->findString("cipher-method", &method)) { found = true; break; } } if (!found) { method = "NONE"; } buffer->meta()->setString("cipher-method", method.c_str()); if (method == "NONE") { return OK; } else if (!(method == "AES-128")) { ALOGE("Unsupported cipher method '%s'", method.c_str()); return ERROR_UNSUPPORTED; } AString keyURI; if (!itemMeta->findString("cipher-uri", &keyURI)) { ALOGE("Missing key uri"); return ERROR_MALFORMED; } ssize_t index = mAESKeyForURI.indexOfKey(keyURI); sp key; if (index >= 0) { key = mAESKeyForURI.valueAt(index); } else { ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key); if (err == ERROR_NOT_CONNECTED) { return ERROR_NOT_CONNECTED; } else if (err < 0) { ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); return ERROR_IO; } else if (key->size() != 16) { ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); return ERROR_MALFORMED; } mAESKeyForURI.add(keyURI, key); } AES_KEY aes_key; if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { ALOGE("failed to set AES decryption key."); return UNKNOWN_ERROR; } size_t n = buffer->size(); if (!n) { return OK; } CHECK(n % 16 == 0); if (first) { // If decrypting the first block in a file, read the iv from the manifest // or derive the iv from the file's sequence number. AString iv; if (itemMeta->findString("cipher-iv", &iv)) { if ((!iv.startsWith("0x") && !iv.startsWith("0X")) || iv.size() != 16 * 2 + 2) { ALOGE("malformed cipher IV '%s'.", iv.c_str()); return ERROR_MALFORMED; } memset(mAESInitVec, 0, sizeof(mAESInitVec)); for (size_t i = 0; i < 16; ++i) { char c1 = tolower(iv.c_str()[2 + 2 * i]); char c2 = tolower(iv.c_str()[3 + 2 * i]); if (!isxdigit(c1) || !isxdigit(c2)) { ALOGE("malformed cipher IV '%s'.", iv.c_str()); return ERROR_MALFORMED; } uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; mAESInitVec[i] = nibble1 << 4 | nibble2; } } else { memset(mAESInitVec, 0, sizeof(mAESInitVec)); mAESInitVec[15] = mSeqNumber & 0xff; mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; } } AES_cbc_encrypt( buffer->data(), buffer->data(), buffer->size(), &aes_key, mAESInitVec, AES_DECRYPT); return OK; } status_t PlaylistFetcher::checkDecryptPadding(const sp &buffer) { AString method; CHECK(buffer->meta()->findString("cipher-method", &method)); if (method == "NONE") { return OK; } uint8_t padding = 0; if (buffer->size() > 0) { padding = buffer->data()[buffer->size() - 1]; } if (padding > 16) { return ERROR_MALFORMED; } for (size_t i = buffer->size() - padding; i < padding; i++) { if (buffer->data()[i] != padding) { return ERROR_MALFORMED; } } buffer->setRange(buffer->offset(), buffer->size() - padding); return OK; } void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { int64_t maxDelayUs = delayUsToRefreshPlaylist(); if (maxDelayUs < minDelayUs) { maxDelayUs = minDelayUs; } if (delayUs > maxDelayUs) { FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs); delayUs = maxDelayUs; } sp msg = new AMessage(kWhatMonitorQueue, this); msg->setInt32("generation", mMonitorQueueGeneration); msg->post(delayUs); } void PlaylistFetcher::cancelMonitorQueue() { ++mMonitorQueueGeneration; } void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) { { AutoMutex _l(mThresholdLock); mThresholdRatio = thresholdRatio; } if (disconnect) { mHTTPDownloader->disconnect(); } } void PlaylistFetcher::resetStoppingThreshold(bool disconnect) { { AutoMutex _l(mThresholdLock); mThresholdRatio = -1.0f; } if (disconnect) { mHTTPDownloader->disconnect(); } else { // allow reconnect mHTTPDownloader->reconnect(); } } float PlaylistFetcher::getStoppingThreshold() { AutoMutex _l(mThresholdLock); return mThresholdRatio; } void PlaylistFetcher::startAsync( const sp &audioSource, const sp &videoSource, const sp &subtitleSource, const sp &metadataSource, int64_t startTimeUs, int64_t segmentStartTimeUs, int32_t startDiscontinuitySeq, LiveSession::SeekMode seekMode) { sp msg = new AMessage(kWhatStart, this); uint32_t streamTypeMask = 0ul; if (audioSource != NULL) { msg->setPointer("audioSource", audioSource.get()); streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; } if (videoSource != NULL) { msg->setPointer("videoSource", videoSource.get()); streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; } if (subtitleSource != NULL) { msg->setPointer("subtitleSource", subtitleSource.get()); streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; } if (metadataSource != NULL) { msg->setPointer("metadataSource", metadataSource.get()); // metadataSource does not affect streamTypeMask. } msg->setInt32("streamTypeMask", streamTypeMask); msg->setInt64("startTimeUs", startTimeUs); msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); msg->setInt32("seekMode", seekMode); msg->post(); } /* * pauseAsync * * threshold: 0.0f - pause after current fetch block (default 47Kbytes) * -1.0f - pause after finishing current segment * 0.0~1.0f - pause if remaining of current segment exceeds threshold */ void PlaylistFetcher::pauseAsync( float thresholdRatio, bool disconnect) { setStoppingThreshold(thresholdRatio, disconnect); (new AMessage(kWhatPause, this))->post(); } void PlaylistFetcher::stopAsync(bool clear) { setStoppingThreshold(0.0f, true /* disconncect */); sp msg = new AMessage(kWhatStop, this); msg->setInt32("clear", clear); msg->post(); } void PlaylistFetcher::resumeUntilAsync(const sp ¶ms) { FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str()); AMessage* msg = new AMessage(kWhatResumeUntil, this); msg->setMessage("params", params); msg->post(); } void PlaylistFetcher::fetchPlaylistAsync() { (new AMessage(kWhatFetchPlaylist, this))->post(); } void PlaylistFetcher::onMessageReceived(const sp &msg) { switch (msg->what()) { case kWhatStart: { status_t err = onStart(msg); sp notify = mNotify->dup(); notify->setInt32("what", kWhatStarted); notify->setInt32("err", err); notify->post(); break; } case kWhatPause: { onPause(); sp notify = mNotify->dup(); notify->setInt32("what", kWhatPaused); notify->setInt32("seekMode", mDownloadState->hasSavedState() ? LiveSession::kSeekModeNextSample : LiveSession::kSeekModeNextSegment); notify->post(); break; } case kWhatStop: { onStop(msg); sp notify = mNotify->dup(); notify->setInt32("what", kWhatStopped); notify->post(); break; } case kWhatFetchPlaylist: { bool unchanged; sp playlist = mHTTPDownloader->fetchPlaylist( mURI.c_str(), NULL /* curPlaylistHash */, &unchanged); sp notify = mNotify->dup(); notify->setInt32("what", kWhatPlaylistFetched); notify->setObject("playlist", playlist); notify->post(); break; } case kWhatMonitorQueue: case kWhatDownloadNext: { int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation != mMonitorQueueGeneration) { // Stale event break; } if (msg->what() == kWhatMonitorQueue) { onMonitorQueue(); } else { onDownloadNext(); } break; } case kWhatResumeUntil: { onResumeUntil(msg); break; } default: TRESPASS(); } } status_t PlaylistFetcher::onStart(const sp &msg) { mPacketSources.clear(); mStopParams.clear(); mStartTimeUsNotify = mNotify->dup(); mStartTimeUsNotify->setInt32("what", kWhatStartedAt); mStartTimeUsNotify->setString("uri", mURI); uint32_t streamTypeMask; CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); int64_t startTimeUs; int64_t segmentStartTimeUs; int32_t startDiscontinuitySeq; int32_t seekMode; CHECK(msg->findInt64("startTimeUs", &startTimeUs)); CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); CHECK(msg->findInt32("seekMode", &seekMode)); if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { void *ptr; CHECK(msg->findPointer("audioSource", &ptr)); mPacketSources.add( LiveSession::STREAMTYPE_AUDIO, static_cast(ptr)); } if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { void *ptr; CHECK(msg->findPointer("videoSource", &ptr)); mPacketSources.add( LiveSession::STREAMTYPE_VIDEO, static_cast(ptr)); } if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { void *ptr; CHECK(msg->findPointer("subtitleSource", &ptr)); mPacketSources.add( LiveSession::STREAMTYPE_SUBTITLES, static_cast(ptr)); } void *ptr; // metadataSource is not part of streamTypeMask if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO)) && msg->findPointer("metadataSource", &ptr)) { mPacketSources.add( LiveSession::STREAMTYPE_METADATA, static_cast(ptr)); } mStreamTypeMask = streamTypeMask; mSegmentStartTimeUs = segmentStartTimeUs; if (startDiscontinuitySeq >= 0) { mDiscontinuitySeq = startDiscontinuitySeq; } mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; mSeekMode = (LiveSession::SeekMode) seekMode; if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) { mStartup = true; mIDRFound = false; mVideoBuffer->clear(); } if (startTimeUs >= 0) { mStartTimeUs = startTimeUs; mFirstPTSValid = false; mSeqNumber = -1; mTimeChangeSignaled = false; mDownloadState->resetState(); } postMonitorQueue(); return OK; } void PlaylistFetcher::onPause() { cancelMonitorQueue(); mLastDiscontinuitySeq = mDiscontinuitySeq; resetStoppingThreshold(false /* disconnect */); } void PlaylistFetcher::onStop(const sp &msg) { cancelMonitorQueue(); int32_t clear; CHECK(msg->findInt32("clear", &clear)); if (clear) { for (size_t i = 0; i < mPacketSources.size(); i++) { sp packetSource = mPacketSources.valueAt(i); packetSource->clear(); } } mDownloadState->resetState(); mPacketSources.clear(); mStreamTypeMask = 0; resetStoppingThreshold(true /* disconnect */); } // Resume until we have reached the boundary timestamps listed in `msg`; when // the remaining time is too short (within a resume threshold) stop immediately // instead. status_t PlaylistFetcher::onResumeUntil(const sp &msg) { sp params; CHECK(msg->findMessage("params", ¶ms)); mStopParams = params; onDownloadNext(); return OK; } void PlaylistFetcher::notifyStopReached() { sp notify = mNotify->dup(); notify->setInt32("what", kWhatStopReached); notify->post(); } void PlaylistFetcher::notifyError(status_t err) { sp notify = mNotify->dup(); notify->setInt32("what", kWhatError); notify->setInt32("err", err); notify->post(); } void PlaylistFetcher::queueDiscontinuity( ATSParser::DiscontinuityType type, const sp &extra) { for (size_t i = 0; i < mPacketSources.size(); ++i) { // do not discard buffer upon #EXT-X-DISCONTINUITY tag // (seek will discard buffer by abandoning old fetchers) mPacketSources.valueAt(i)->queueDiscontinuity( type, extra, false /* discard */); } } void PlaylistFetcher::onMonitorQueue() { // in the middle of an unfinished download, delay // playlist refresh as it'll change seq numbers if (!mDownloadState->hasSavedState()) { refreshPlaylist(); } int64_t targetDurationUs = kMinBufferedDurationUs; if (mPlaylist != NULL) { targetDurationUs = mPlaylist->getTargetDuration(); } int64_t bufferedDurationUs = 0ll; status_t finalResult = OK; if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { sp packetSource = mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); bufferedDurationUs = packetSource->getBufferedDurationUs(&finalResult); } else { // Use min stream duration, but ignore streams that never have any packet // enqueued to prevent us from waiting on a non-existent stream; // when we cannot make out from the manifest what streams are included in // a playlist we might assume extra streams. bufferedDurationUs = -1ll; for (size_t i = 0; i < mPacketSources.size(); ++i) { if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) { continue; } int64_t bufferedStreamDurationUs = mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs); if (bufferedDurationUs == -1ll || bufferedStreamDurationUs < bufferedDurationUs) { bufferedDurationUs = bufferedStreamDurationUs; } } if (bufferedDurationUs == -1ll) { bufferedDurationUs = 0ll; } } if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) { FLOGV("monitoring, buffered=%lld < %lld", (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs); // delay the next download slightly; hopefully this gives other concurrent fetchers // a better chance to run. // onDownloadNext(); sp msg = new AMessage(kWhatDownloadNext, this); msg->setInt32("generation", mMonitorQueueGeneration); msg->post(1000l); } else { // We'd like to maintain buffering above durationToBufferUs, so try // again when buffer just about to go below durationToBufferUs // (or after targetDurationUs / 2, whichever is smaller). int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll; if (delayUs > targetDurationUs / 2) { delayUs = targetDurationUs / 2; } FLOGV("pausing for %lld, buffered=%lld > %lld", (long long)delayUs, (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs); postMonitorQueue(delayUs); } } status_t PlaylistFetcher::refreshPlaylist() { if (delayUsToRefreshPlaylist() <= 0) { bool unchanged; sp playlist = mHTTPDownloader->fetchPlaylist( mURI.c_str(), mPlaylistHash, &unchanged); if (playlist == NULL) { if (unchanged) { // We succeeded in fetching the playlist, but it was // unchanged from the last time we tried. if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { mRefreshState = (RefreshState)(mRefreshState + 1); } } else { ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str()); return ERROR_IO; } } else { mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; mPlaylist = playlist; if (mPlaylist->isComplete() || mPlaylist->isEvent()) { updateDuration(); } // Notify LiveSession to use target-duration based buffering level // for up/down switch. Default LiveSession::kUpSwitchMark may not // be reachable for live streams, as our max buffering amount is // limited to 3 segments. if (!mPlaylist->isComplete()) { updateTargetDuration(); } mPlaylistTimeUs = ALooper::GetNowUs(); } mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); } return OK; } // static bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp& buffer) { return buffer->size() > 0 && buffer->data()[0] == 0x47; } bool PlaylistFetcher::shouldPauseDownload() { if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { // doesn't apply to subtitles return false; } // Calculate threshold to abort current download float thresholdRatio = getStoppingThreshold(); if (thresholdRatio < 0.0f) { // never abort return false; } else if (thresholdRatio == 0.0f) { // immediately abort return true; } // now we have a positive thresholdUs, abort if remaining // portion to download is over that threshold. if (mSegmentFirstPTS < 0) { // this means we haven't even find the first access unit, // abort now as we must be very far away from the end. return true; } int64_t lastEnqueueUs = mSegmentFirstPTS; for (size_t i = 0; i < mPacketSources.size(); ++i) { if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { continue; } sp meta = mPacketSources[i]->getLatestEnqueuedMeta(); int32_t type; if (meta == NULL || meta->findInt32("discontinuity", &type)) { continue; } int64_t tmpUs; CHECK(meta->findInt64("timeUs", &tmpUs)); if (tmpUs > lastEnqueueUs) { lastEnqueueUs = tmpUs; } } lastEnqueueUs -= mSegmentFirstPTS; int64_t targetDurationUs = mPlaylist->getTargetDuration(); int64_t thresholdUs = thresholdRatio * targetDurationUs; FLOGV("%spausing now, thresholdUs %lld, remaining %lld", targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ", (long long)thresholdUs, (long long)(targetDurationUs - lastEnqueueUs)); if (targetDurationUs - lastEnqueueUs > thresholdUs) { return true; } return false; } bool PlaylistFetcher::initDownloadState( AString &uri, sp &itemMeta, int32_t &firstSeqNumberInPlaylist, int32_t &lastSeqNumberInPlaylist) { status_t err = refreshPlaylist(); firstSeqNumberInPlaylist = 0; lastSeqNumberInPlaylist = 0; bool discontinuity = false; if (mPlaylist != NULL) { mPlaylist->getSeqNumberRange( &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); if (mDiscontinuitySeq < 0) { mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); } } mSegmentFirstPTS = -1ll; if (mPlaylist != NULL && mSeqNumber < 0) { CHECK_GE(mStartTimeUs, 0ll); if (mSegmentStartTimeUs < 0) { if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { // If this is a live session, start 3 segments from the end on connect if (!getSeqNumberInLiveStreaming()) { mSeqNumber = lastSeqNumberInPlaylist - 3; } if (mSeqNumber < firstSeqNumberInPlaylist) { mSeqNumber = firstSeqNumberInPlaylist; } } else { // When seeking mSegmentStartTimeUs is unavailable (< 0), we // use mStartTimeUs (client supplied timestamp) to determine both start segment // and relative position inside a segment mSeqNumber = getSeqNumberForTime(mStartTimeUs); mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); } mStartTimeUsRelative = true; FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)", (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); } else { // When adapting or track switching, mSegmentStartTimeUs (relative // to media time 0) is used to determine the start segment; mStartTimeUs (absolute // timestamps coming from the media container) is used to determine the position // inside a segments. if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES && mSeekMode != LiveSession::kSeekModeNextSample) { // avoid double fetch/decode // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search // for the starting segment in new variant. // If the two variants' segments are aligned, this gives the // next segment. If they're not aligned, this gives the segment // that overlaps no more than 1/2 * targetDurationUs. mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs + mPlaylist->getTargetDuration() / 2); } else { mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); } ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); if (mSeqNumber < minSeq) { mSeqNumber = minSeq; } if (mSeqNumber < firstSeqNumberInPlaylist) { mSeqNumber = firstSeqNumberInPlaylist; } if (mSeqNumber > lastSeqNumberInPlaylist) { mSeqNumber = lastSeqNumberInPlaylist; } FLOGV("Initial sequence number is %d from (%d .. %d)", mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); } } // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true if (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist || err != OK) { if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) { ++mNumRetries; if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) { // make sure we reach this retry logic on refresh failures // by adding an err != OK clause to all enclosing if's. // refresh in increasing fraction (1/2, 1/3, ...) of the // playlist's target duration or 3 seconds, whichever is less int64_t delayUs = kMaxMonitorDelayUs; if (mPlaylist != NULL) { delayUs = mPlaylist->size() * mPlaylist->getTargetDuration() / (1 + mNumRetries); } if (delayUs > kMaxMonitorDelayUs) { delayUs = kMaxMonitorDelayUs; } FLOGV("sequence number high: %d from (%d .. %d), " "monitor in %lld (retry=%d)", mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries); postMonitorQueue(delayUs); return false; } if (err != OK) { notifyError(err); return false; } // we've missed the boat, let's start 3 segments prior to the latest sequence // number available and signal a discontinuity. ALOGI("We've missed the boat, restarting playback." " mStartup=%d, was looking for %d in %d-%d", mStartup, mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); if (mStopParams != NULL) { // we should have kept on fetching until we hit the boundaries in mStopParams, // but since the segments we are supposed to fetch have already rolled off // the playlist, i.e. we have already missed the boat, we inevitably have to // skip. notifyStopReached(); return false; } mSeqNumber = lastSeqNumberInPlaylist - 3; if (mSeqNumber < firstSeqNumberInPlaylist) { mSeqNumber = firstSeqNumberInPlaylist; } discontinuity = true; // fall through } else { if (mPlaylist != NULL) { ALOGE("Cannot find sequence number %d in playlist " "(contains %d - %d)", mSeqNumber, firstSeqNumberInPlaylist, firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); if (mTSParser != NULL) { mTSParser->signalEOS(ERROR_END_OF_STREAM); // Use an empty buffer; we don't have any new data, just want to extract // potential new access units after flush. Reset mSeqNumber to // lastSeqNumberInPlaylist such that we set the correct access unit // properties in extractAndQueueAccessUnitsFromTs. sp buffer = new ABuffer(0); mSeqNumber = lastSeqNumberInPlaylist; extractAndQueueAccessUnitsFromTs(buffer); } notifyError(ERROR_END_OF_STREAM); } else { // It's possible that we were never able to download the playlist. // In this case we should notify error, instead of EOS, as EOS during // prepare means we succeeded in downloading everything. ALOGE("Failed to download playlist!"); notifyError(ERROR_IO); } return false; } } mNumRetries = 0; CHECK(mPlaylist->itemAt( mSeqNumber - firstSeqNumberInPlaylist, &uri, &itemMeta)); CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq)); int32_t val; if (itemMeta->findInt32("discontinuity", &val) && val != 0) { discontinuity = true; } else if (mLastDiscontinuitySeq >= 0 && mDiscontinuitySeq != mLastDiscontinuitySeq) { // Seek jumped to a new discontinuity sequence. We need to signal // a format change to decoder. Decoder needs to shutdown and be // created again if seamless format change is unsupported. FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, " "mDiscontinuitySeq %d, mStartTimeUs %lld", mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs); discontinuity = true; } mLastDiscontinuitySeq = -1; // decrypt a junk buffer to prefetch key; since a session uses only one http connection, // this avoids interleaved connections to the key and segment file. { sp junk = new ABuffer(16); junk->setRange(0, 16); status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, true /* first */); if (err == ERROR_NOT_CONNECTED) { return false; } else if (err != OK) { notifyError(err); return false; } } if ((mStartup && !mTimeChangeSignaled) || discontinuity) { // We need to signal a time discontinuity to ATSParser on the // first segment after start, or on a discontinuity segment. // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX() // to send the time discontinuity. if (mPlaylist->isComplete() || mPlaylist->isEvent()) { // If this was a live event this made no sense since // we don't have access to all the segment before the current // one. mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); } // Setting mTimeChangeSignaled to true, so that if start time // searching goes into 2nd segment (without a discontinuity), // we don't reset time again. It causes corruption when pending // data in ATSParser is cleared. mTimeChangeSignaled = true; } if (discontinuity) { ALOGI("queueing discontinuity (explicit=%d)", discontinuity); // Signal a format discontinuity to ATSParser to clear partial data // from previous streams. Not doing this causes bitstream corruption. if (mTSParser != NULL) { mTSParser->signalDiscontinuity( ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */); } queueDiscontinuity( ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL /* extra */); if (mStartup && mStartTimeUsRelative && mFirstPTSValid) { // This means we guessed mStartTimeUs to be in the previous // segment (likely very close to the end), but either video or // audio has not found start by the end of that segment. // // If this new segment is not a discontinuity, keep searching. // // If this new segment even got a discontinuity marker, just // set mStartTimeUs=0, and take all samples from now on. mStartTimeUs = 0; mFirstPTSValid = false; mIDRFound = false; mVideoBuffer->clear(); } } FLOGV("fetching segment %d from (%d .. %d)", mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); return true; } void PlaylistFetcher::onDownloadNext() { AString uri; sp itemMeta; sp buffer; sp tsBuffer; int32_t firstSeqNumberInPlaylist = 0; int32_t lastSeqNumberInPlaylist = 0; bool connectHTTP = true; if (mDownloadState->hasSavedState()) { mDownloadState->restoreState( uri, itemMeta, buffer, tsBuffer, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); connectHTTP = false; FLOGV("resuming: '%s'", uri.c_str()); } else { if (!initDownloadState( uri, itemMeta, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist)) { return; } FLOGV("fetching: '%s'", uri.c_str()); } int64_t range_offset, range_length; if (!itemMeta->findInt64("range-offset", &range_offset) || !itemMeta->findInt64("range-length", &range_length)) { range_offset = 0; range_length = -1; } // block-wise download bool shouldPause = false; ssize_t bytesRead; do { int64_t startUs = ALooper::GetNowUs(); bytesRead = mHTTPDownloader->fetchBlock( uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, NULL /* actualURL */, connectHTTP); int64_t delayUs = ALooper::GetNowUs() - startUs; if (bytesRead == ERROR_NOT_CONNECTED) { return; } if (bytesRead < 0) { status_t err = bytesRead; ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); notifyError(err); return; } // add sample for bandwidth estimation, excluding samples from subtitles (as // its too small), or during startup/resumeUntil (when we could have more than // one connection open which affects bandwidth) if (!mStartup && mStopParams == NULL && bytesRead > 0 && (mStreamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))) { mSession->addBandwidthMeasurement(bytesRead, delayUs); if (delayUs > 2000000ll) { FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip", bytesRead, (double)delayUs / 1.0e6); } } connectHTTP = false; CHECK(buffer != NULL); size_t size = buffer->size(); // Set decryption range. buffer->setRange(size - bytesRead, bytesRead); status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, buffer->offset() == 0 /* first */); // Unset decryption range. buffer->setRange(0, size); if (err != OK) { ALOGE("decryptBuffer failed w/ error %d", err); notifyError(err); return; } bool startUp = mStartup; // save current start up state err = OK; if (bufferStartsWithTsSyncByte(buffer)) { // Incremental extraction is only supported for MPEG2 transport streams. if (tsBuffer == NULL) { tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); tsBuffer->setRange(0, 0); } else if (tsBuffer->capacity() != buffer->capacity()) { size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); tsBuffer->setRange(tsOff, tsSize); } tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); err = extractAndQueueAccessUnitsFromTs(tsBuffer); } if (err == -EAGAIN) { // starting sequence number too low/high mTSParser.clear(); for (size_t i = 0; i < mPacketSources.size(); i++) { sp packetSource = mPacketSources.valueAt(i); packetSource->clear(); } postMonitorQueue(); return; } else if (err == ERROR_OUT_OF_RANGE) { // reached stopping point notifyStopReached(); return; } else if (err != OK) { notifyError(err); return; } // If we're switching, post start notification // this should only be posted when the last chunk is full processed by TSParser if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) { CHECK(mStartTimeUsNotify != NULL); mStartTimeUsNotify->post(); mStartTimeUsNotify.clear(); shouldPause = true; } if (shouldPause || shouldPauseDownload()) { // save state and return if this is not the last chunk, // leaving the fetcher in paused state. if (bytesRead != 0) { mDownloadState->saveState( uri, itemMeta, buffer, tsBuffer, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); return; } shouldPause = true; } } while (bytesRead != 0); if (bufferStartsWithTsSyncByte(buffer)) { // If we don't see a stream in the program table after fetching a full ts segment // mark it as nonexistent. ATSParser::SourceType srcTypes[] = { ATSParser::VIDEO, ATSParser::AUDIO }; LiveSession::StreamType streamTypes[] = { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; const size_t kNumTypes = NELEM(srcTypes); for (size_t i = 0; i < kNumTypes; i++) { ATSParser::SourceType srcType = srcTypes[i]; LiveSession::StreamType streamType = streamTypes[i]; sp source = static_cast( mTSParser->getSource(srcType).get()); if (!mTSParser->hasSource(srcType)) { ALOGW("MPEG2 Transport stream does not contain %s data.", srcType == ATSParser::VIDEO ? "video" : "audio"); mStreamTypeMask &= ~streamType; mPacketSources.removeItem(streamType); } } } if (checkDecryptPadding(buffer) != OK) { ALOGE("Incorrect padding bytes after decryption."); notifyError(ERROR_MALFORMED); return; } if (tsBuffer != NULL) { AString method; CHECK(buffer->meta()->findString("cipher-method", &method)); if ((tsBuffer->size() > 0 && method == "NONE") || tsBuffer->size() > 16) { ALOGE("MPEG2 transport stream is not an even multiple of 188 " "bytes in length."); notifyError(ERROR_MALFORMED); return; } } // bulk extract non-ts files bool startUp = mStartup; if (tsBuffer == NULL) { status_t err = extractAndQueueAccessUnits(buffer, itemMeta); if (err == -EAGAIN) { // starting sequence number too low/high postMonitorQueue(); return; } else if (err == ERROR_OUT_OF_RANGE) { // reached stopping point notifyStopReached(); return; } else if (err != OK) { notifyError(err); return; } } if (checkSwitchBandwidth()) { return; } ++mSeqNumber; // if adapting, pause after found the next starting point if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) { CHECK(mStartTimeUsNotify != NULL); mStartTimeUsNotify->post(); mStartTimeUsNotify.clear(); shouldPause = true; } if (!shouldPause) { postMonitorQueue(); } } /* * returns true if we need to adjust mSeqNumber */ bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) { int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber(); int64_t minDiffUs, maxDiffUs; if (mSeekMode == LiveSession::kSeekModeNextSample) { // if the previous fetcher paused in the middle of a segment, we // want to start at a segment that overlaps the last sample minDiffUs = -mPlaylist->getTargetDuration(); maxDiffUs = 0ll; } else { // if the previous fetcher paused at the end of a segment, ideally // we want to start at the segment that's roughly aligned with its // next segment, but if the two variants are not well aligned we // adjust the diff to within (-T/2, T/2) minDiffUs = -mPlaylist->getTargetDuration() / 2; maxDiffUs = mPlaylist->getTargetDuration() / 2; } int32_t oldSeqNumber = mSeqNumber; ssize_t index = mSeqNumber - firstSeqNumberInPlaylist; // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs int64_t diffUs = anchorTimeUs - mStartTimeUs; if (diffUs > maxDiffUs) { while (index > 0 && diffUs > maxDiffUs) { --index; sp itemMeta; CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); int64_t itemDurationUs; CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); diffUs -= itemDurationUs; } } else if (diffUs < minDiffUs) { while (index + 1 < (ssize_t) mPlaylist->size() && diffUs < minDiffUs) { ++index; sp itemMeta; CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); int64_t itemDurationUs; CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); diffUs += itemDurationUs; } } mSeqNumber = firstSeqNumberInPlaylist + index; if (mSeqNumber != oldSeqNumber) { FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]", (long long) anchorTimeUs - mStartTimeUs, (long long) minDiffUs, (long long) maxDiffUs); return true; } return false; } int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber(); size_t index = 0; while (index < mPlaylist->size()) { sp itemMeta; CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); size_t curDiscontinuitySeq; CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq)); int32_t seqNumber = firstSeqNumberInPlaylist + index; if (curDiscontinuitySeq == discontinuitySeq) { return seqNumber; } else if (curDiscontinuitySeq > discontinuitySeq) { return seqNumber <= 0 ? 0 : seqNumber - 1; } ++index; } return firstSeqNumberInPlaylist + mPlaylist->size(); } int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { size_t index = 0; int64_t segmentStartUs = 0; while (index < mPlaylist->size()) { sp itemMeta; CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); int64_t itemDurationUs; CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); if (timeUs < segmentStartUs + itemDurationUs) { break; } segmentStartUs += itemDurationUs; ++index; } if (index >= mPlaylist->size()) { index = mPlaylist->size() - 1; } return mPlaylist->getFirstSeqNumber() + index; } const sp &PlaylistFetcher::setAccessUnitProperties( const sp &accessUnit, const sp &source, bool discard) { sp format = source->getFormat(); if (format != NULL) { // for simplicity, store a reference to the format in each unit accessUnit->meta()->setObject("format", format); } if (discard) { accessUnit->meta()->setInt32("discard", discard); } accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS); accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber)); if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs); } return accessUnit; } bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) { if (!mFirstPTSValid) { mFirstTimeUs = timeUs; mFirstPTSValid = true; } bool startTimeReached = true; if (mStartTimeUsRelative) { FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld", (long long)timeUs, (long long)mFirstTimeUs, (long long)(timeUs - mFirstTimeUs)); timeUs -= mFirstTimeUs; if (timeUs < 0) { FLOGV("clamp negative timeUs to 0"); timeUs = 0; } startTimeReached = (timeUs >= mStartTimeUs); } return startTimeReached; } status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp &buffer) { if (mTSParser == NULL) { // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); } if (mNextPTSTimeUs >= 0ll) { sp extra = new AMessage; // Since we are using absolute timestamps, signal an offset of 0 to prevent // ATSParser from skewing the timestamps of access units. extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); // When adapting, signal a recent media time to the parser, // so that PTS wrap around is handled for the new variant. if (mStartTimeUs >= 0 && !mStartTimeUsRelative) { extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs); } mTSParser->signalDiscontinuity( ATSParser::DISCONTINUITY_TIME, extra); mNextPTSTimeUs = -1ll; } size_t offset = 0; while (offset + 188 <= buffer->size()) { status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); if (err != OK) { return err; } offset += 188; } // setRange to indicate consumed bytes. buffer->setRange(buffer->offset() + offset, buffer->size() - offset); if (mSegmentFirstPTS < 0ll) { // get the smallest first PTS from all streams present in this parser for (size_t i = mPacketSources.size(); i > 0;) { i--; const LiveSession::StreamType stream = mPacketSources.keyAt(i); if (stream == LiveSession::STREAMTYPE_SUBTITLES) { ALOGE("MPEG2 Transport streams do not contain subtitles."); return ERROR_MALFORMED; } if (stream == LiveSession::STREAMTYPE_METADATA) { continue; } ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream); sp source = static_cast( mTSParser->getSource(type).get()); if (source == NULL) { continue; } sp meta = source->getMetaAfterLastDequeued(0); if (meta != NULL) { int64_t timeUs; CHECK(meta->findInt64("timeUs", &timeUs)); if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) { mSegmentFirstPTS = timeUs; } } } if (mSegmentFirstPTS < 0ll) { // didn't find any TS packet, can return early return OK; } if (!mStartTimeUsRelative) { // mStartup // mStartup is true until we have queued a packet for all the streams // we are fetching. We queue packets whose timestamps are greater than // mStartTimeUs. // mSegmentStartTimeUs >= 0 // mSegmentStartTimeUs is non-negative when adapting or switching tracks // adjustSeqNumberWithAnchorTime(timeUs) == true // we guessed a seq number that's either too large or too small. // If this happens, we'll adjust mSeqNumber and restart fetching from new // location. Note that we only want to adjust once, so set mSegmentStartTimeUs // to -1 so that we don't enter this chunk next time. if (mStartup && mSegmentStartTimeUs >= 0 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) { mStartTimeUsNotify = mNotify->dup(); mStartTimeUsNotify->setInt32("what", kWhatStartedAt); mStartTimeUsNotify->setString("uri", mURI); mIDRFound = false; mSegmentStartTimeUs = -1; return -EAGAIN; } } } status_t err = OK; for (size_t i = mPacketSources.size(); i > 0;) { i--; sp packetSource = mPacketSources.valueAt(i); const LiveSession::StreamType stream = mPacketSources.keyAt(i); if (stream == LiveSession::STREAMTYPE_SUBTITLES) { ALOGE("MPEG2 Transport streams do not contain subtitles."); return ERROR_MALFORMED; } const char *key = LiveSession::getKeyForStream(stream); ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream); sp source = static_cast( mTSParser->getSource(type).get()); if (source == NULL) { continue; } const char *mime; sp format = source->getFormat(); bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime) && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC); sp accessUnit; status_t finalResult; while (source->hasBufferAvailable(&finalResult) && source->dequeueAccessUnit(&accessUnit) == OK) { int64_t timeUs; CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); if (mStartup) { bool startTimeReached = isStartTimeReached(timeUs); if (!startTimeReached || (isAvc && !mIDRFound)) { // buffer up to the closest preceding IDR frame in the next segement, // or the closest succeeding IDR frame after the exact position FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d", (long long)timeUs, (long long)mStartTimeUs, (long long)timeUs - mStartTimeUs, mIDRFound); if (isAvc) { if (IsIDR(accessUnit)) { mVideoBuffer->clear(); FSLOGV(stream, "found IDR, clear mVideoBuffer"); mIDRFound = true; } if (mIDRFound && mStartTimeUsRelative && !startTimeReached) { mVideoBuffer->queueAccessUnit(accessUnit); FSLOGV(stream, "saving AVC video AccessUnit"); } } if (!startTimeReached || (isAvc && !mIDRFound)) { continue; } } } if (mStartTimeUsNotify != NULL) { uint32_t streamMask = 0; mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); if ((mStreamTypeMask & mPacketSources.keyAt(i)) && !(streamMask & mPacketSources.keyAt(i))) { streamMask |= mPacketSources.keyAt(i); mStartTimeUsNotify->setInt32("streamMask", streamMask); FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x", (long long)timeUs, streamMask); if (streamMask == mStreamTypeMask) { FLOGV("found start point for all streams"); mStartup = false; } } } if (mStopParams != NULL) { int32_t discontinuitySeq; int64_t stopTimeUs; if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) || discontinuitySeq > mDiscontinuitySeq || !mStopParams->findInt64(key, &stopTimeUs) || (discontinuitySeq == mDiscontinuitySeq && timeUs >= stopTimeUs)) { FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs); mStreamTypeMask &= ~stream; mPacketSources.removeItemsAt(i); break; } } if (stream == LiveSession::STREAMTYPE_VIDEO) { const bool discard = true; status_t status; while (mVideoBuffer->hasBufferAvailable(&status)) { sp videoBuffer; mVideoBuffer->dequeueAccessUnit(&videoBuffer); setAccessUnitProperties(videoBuffer, source, discard); packetSource->queueAccessUnit(videoBuffer); int64_t bufferTimeUs; CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs)); FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld", (long long)bufferTimeUs); } } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) { mHasMetadata = true; sp notify = mNotify->dup(); notify->setInt32("what", kWhatMetadataDetected); notify->post(); } setAccessUnitProperties(accessUnit, source); packetSource->queueAccessUnit(accessUnit); FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs); } if (err != OK) { break; } } if (err != OK) { for (size_t i = mPacketSources.size(); i > 0;) { i--; sp packetSource = mPacketSources.valueAt(i); packetSource->clear(); } return err; } if (!mStreamTypeMask) { // Signal gap is filled between original and new stream. FLOGV("reached stop point for all streams"); return ERROR_OUT_OF_RANGE; } return OK; } /* static */ bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( const sp &buffer) { size_t pos = 0; // skip possible BOM if (buffer->size() >= pos + 3 && !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { pos += 3; } // accept WEBVTT followed by SPACE, TAB or (CR) LF if (buffer->size() < pos + 6 || memcmp("WEBVTT", buffer->data() + pos, 6)) { return false; } pos += 6; if (buffer->size() == pos) { return true; } uint8_t sep = buffer->data()[pos]; return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; } status_t PlaylistFetcher::extractAndQueueAccessUnits( const sp &buffer, const sp &itemMeta) { if (bufferStartsWithWebVTTMagicSequence(buffer)) { if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { ALOGE("This stream only contains subtitles."); return ERROR_MALFORMED; } const sp packetSource = mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); int64_t durationUs; CHECK(itemMeta->findInt64("durationUs", &durationUs)); buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); buffer->meta()->setInt64("durationUs", durationUs); buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration); packetSource->queueAccessUnit(buffer); return OK; } if (mNextPTSTimeUs >= 0ll) { mNextPTSTimeUs = -1ll; } // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio // stream prefixed by an ID3 tag. bool firstID3Tag = true; uint64_t PTS = 0; for (;;) { // Make sure to skip all ID3 tags preceding the audio data. // At least one must be present to provide the PTS timestamp. ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); if (!id3.isValid()) { if (firstID3Tag) { ALOGE("Unable to parse ID3 tag."); return ERROR_MALFORMED; } else { break; } } if (firstID3Tag) { bool found = false; ID3::Iterator it(id3, "PRIV"); while (!it.done()) { size_t length; const uint8_t *data = it.getData(&length); if (!data) { return ERROR_MALFORMED; } static const char *kMatchName = "com.apple.streaming.transportStreamTimestamp"; static const size_t kMatchNameLen = strlen(kMatchName); if (length == kMatchNameLen + 1 + 8 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { found = true; PTS = U64_AT(&data[kMatchNameLen + 1]); } it.next(); } if (!found) { ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); return ERROR_MALFORMED; } } // skip the ID3 tag buffer->setRange( buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); firstID3Tag = false; } if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { ALOGW("This stream only contains audio data!"); mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; if (mStreamTypeMask == 0) { return OK; } } sp packetSource = mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); if (packetSource->getFormat() == NULL && buffer->size() >= 7) { ABitReader bits(buffer->data(), buffer->size()); // adts_fixed_header CHECK_EQ(bits.getBits(12), 0xfffu); bits.skipBits(3); // ID, layer bool protection_absent __unused = bits.getBits(1) != 0; unsigned profile = bits.getBits(2); CHECK_NE(profile, 3u); unsigned sampling_freq_index = bits.getBits(4); bits.getBits(1); // private_bit unsigned channel_configuration = bits.getBits(3); CHECK_NE(channel_configuration, 0u); bits.skipBits(2); // original_copy, home sp meta = MakeAACCodecSpecificData( profile, sampling_freq_index, channel_configuration); meta->setInt32(kKeyIsADTS, true); packetSource->setFormat(meta); } int64_t numSamples = 0ll; int32_t sampleRate; CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); int64_t timeUs = (PTS * 100ll) / 9ll; if (mStartup && !mFirstPTSValid) { mFirstPTSValid = true; mFirstTimeUs = timeUs; } if (mSegmentFirstPTS < 0ll) { mSegmentFirstPTS = timeUs; if (!mStartTimeUsRelative) { // Duplicated logic from how we handle .ts playlists. if (mStartup && mSegmentStartTimeUs >= 0 && adjustSeqNumberWithAnchorTime(timeUs)) { mSegmentStartTimeUs = -1; return -EAGAIN; } } } size_t offset = 0; while (offset < buffer->size()) { const uint8_t *adtsHeader = buffer->data() + offset; CHECK_LT(offset + 5, buffer->size()); unsigned aac_frame_length = ((adtsHeader[3] & 3) << 11) | (adtsHeader[4] << 3) | (adtsHeader[5] >> 5); if (aac_frame_length == 0) { const uint8_t *id3Header = adtsHeader; if (!memcmp(id3Header, "ID3", 3)) { ID3 id3(id3Header, buffer->size() - offset, true); if (id3.isValid()) { offset += id3.rawSize(); continue; }; } return ERROR_MALFORMED; } CHECK_LE(offset + aac_frame_length, buffer->size()); int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; offset += aac_frame_length; // Each AAC frame encodes 1024 samples. numSamples += 1024; if (mStartup) { int64_t startTimeUs = unitTimeUs; if (mStartTimeUsRelative) { startTimeUs -= mFirstTimeUs; if (startTimeUs < 0) { startTimeUs = 0; } } if (startTimeUs < mStartTimeUs) { continue; } if (mStartTimeUsNotify != NULL) { mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); mStartup = false; } } if (mStopParams != NULL) { int32_t discontinuitySeq; int64_t stopTimeUs; if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) || discontinuitySeq > mDiscontinuitySeq || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { mStreamTypeMask = 0; mPacketSources.clear(); return ERROR_OUT_OF_RANGE; } } sp unit = new ABuffer(aac_frame_length); memcpy(unit->data(), adtsHeader, aac_frame_length); unit->meta()->setInt64("timeUs", unitTimeUs); setAccessUnitProperties(unit, packetSource); packetSource->queueAccessUnit(unit); } return OK; } void PlaylistFetcher::updateDuration() { int64_t durationUs = 0ll; for (size_t index = 0; index < mPlaylist->size(); ++index) { sp itemMeta; CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); int64_t itemDurationUs; CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); durationUs += itemDurationUs; } sp msg = mNotify->dup(); msg->setInt32("what", kWhatDurationUpdate); msg->setInt64("durationUs", durationUs); msg->post(); } void PlaylistFetcher::updateTargetDuration() { sp msg = mNotify->dup(); msg->setInt32("what", kWhatTargetDurationUpdate); msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration()); msg->post(); } } // namespace android