diff options
Diffstat (limited to 'media')
27 files changed, 419 insertions, 484 deletions
diff --git a/media/libmedia/AudioTrack.cpp b/media/libmedia/AudioTrack.cpp index 0ad9cc0..c775e7b 100644 --- a/media/libmedia/AudioTrack.cpp +++ b/media/libmedia/AudioTrack.cpp @@ -299,6 +299,9 @@ status_t AudioTrack::set( ALOGV("Building AudioTrack with attributes: usage=%d content=%d flags=0x%x tags=[%s]", mAttributes.usage, mAttributes.content_type, mAttributes.flags, mAttributes.tags); mStreamType = AUDIO_STREAM_DEFAULT; + if ((mAttributes.flags & AUDIO_FLAG_HW_AV_SYNC) != 0) { + flags = (audio_output_flags_t)(flags | AUDIO_OUTPUT_FLAG_HW_AV_SYNC); + } } // these below should probably come from the audioFlinger too... diff --git a/media/libmediaplayerservice/MediaPlayerService.cpp b/media/libmediaplayerservice/MediaPlayerService.cpp index 3a399af..f113e21 100644 --- a/media/libmediaplayerservice/MediaPlayerService.cpp +++ b/media/libmediaplayerservice/MediaPlayerService.cpp @@ -442,6 +442,9 @@ status_t MediaPlayerService::dump(int fd, const Vector<String16>& args) const size_t SIZE = 256; char buffer[SIZE]; String8 result; + SortedVector< sp<Client> > clients; //to serialise the mutex unlock & client destruction. + SortedVector< sp<MediaRecorderClient> > mediaRecorderClients; + if (checkCallingPermission(String16("android.permission.DUMP")) == false) { snprintf(buffer, SIZE, "Permission Denial: " "can't dump MediaPlayerService from pid=%d, uid=%d\n", @@ -453,6 +456,7 @@ status_t MediaPlayerService::dump(int fd, const Vector<String16>& args) for (int i = 0, n = mClients.size(); i < n; ++i) { sp<Client> c = mClients[i].promote(); if (c != 0) c->dump(fd, args); + clients.add(c); } if (mMediaRecorderClients.size() == 0) { result.append(" No media recorder client\n\n"); @@ -465,6 +469,7 @@ status_t MediaPlayerService::dump(int fd, const Vector<String16>& args) write(fd, result.string(), result.size()); result = "\n"; c->dump(fd, args); + mediaRecorderClients.add(c); } } } diff --git a/media/libmediaplayerservice/nuplayer/Android.mk b/media/libmediaplayerservice/nuplayer/Android.mk index e2c72ed..6609874 100644 --- a/media/libmediaplayerservice/nuplayer/Android.mk +++ b/media/libmediaplayerservice/nuplayer/Android.mk @@ -4,7 +4,6 @@ include $(CLEAR_VARS) LOCAL_SRC_FILES:= \ GenericSource.cpp \ HTTPLiveSource.cpp \ - MediaClock.cpp \ NuPlayer.cpp \ NuPlayerCCDecoder.cpp \ NuPlayerDecoder.cpp \ diff --git a/media/libmediaplayerservice/nuplayer/GenericSource.cpp b/media/libmediaplayerservice/nuplayer/GenericSource.cpp index 24163e8..70ae85e 100644 --- a/media/libmediaplayerservice/nuplayer/GenericSource.cpp +++ b/media/libmediaplayerservice/nuplayer/GenericSource.cpp @@ -888,7 +888,7 @@ void NuPlayer::GenericSource::onMessageReceived(const sp<AMessage> &msg) { mVideoTrack.mPackets->clear(); } sp<AMessage> response = new AMessage; - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); break; @@ -990,7 +990,7 @@ void NuPlayer::GenericSource::onGetFormatMeta(sp<AMessage> msg) const { sp<MetaData> format = doGetFormatMeta(audio); response->setPointer("format", format.get()); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); } @@ -1153,7 +1153,7 @@ void NuPlayer::GenericSource::onGetSelectedTrack(sp<AMessage> msg) const { ssize_t index = doGetSelectedTrack(type); response->setInt32("index", index); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); } @@ -1211,7 +1211,7 @@ void NuPlayer::GenericSource::onSelectTrack(sp<AMessage> msg) { status_t err = doSelectTrack(trackIndex, select, timeUs); response->setInt32("err", err); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); } @@ -1324,7 +1324,7 @@ void NuPlayer::GenericSource::onSeek(sp<AMessage> msg) { status_t err = doSeek(seekTimeUs); response->setInt32("err", err); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); } diff --git a/media/libmediaplayerservice/nuplayer/MediaClock.h b/media/libmediaplayerservice/nuplayer/MediaClock.h deleted file mode 100644 index 660764f..0000000 --- a/media/libmediaplayerservice/nuplayer/MediaClock.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (C) 2015 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef MEDIA_CLOCK_H_ - -#define MEDIA_CLOCK_H_ - -#include <media/stagefright/foundation/ABase.h> -#include <utils/Mutex.h> -#include <utils/RefBase.h> - -namespace android { - -struct AMessage; - -struct MediaClock : public RefBase { - MediaClock(); - - void setStartingTimeMedia(int64_t startingTimeMediaUs); - - void clearAnchor(); - // It's required to use timestamp of just rendered frame as - // anchor time in paused state. - void updateAnchor( - int64_t anchorTimeMediaUs, - int64_t anchorTimeRealUs, - int64_t maxTimeMediaUs = INT64_MAX); - - void updateMaxTimeMedia(int64_t maxTimeMediaUs); - - void setPlaybackRate(float rate); - - // query media time corresponding to real time |realUs|, and save the - // result in |outMediaUs|. - status_t getMediaTime(int64_t realUs, - int64_t *outMediaUs, - bool allowPastMaxTime = false); - // query real time corresponding to media time |targetMediaUs|. - // The result is saved in |outRealUs|. - status_t getRealTimeFor(int64_t targetMediaUs, int64_t *outRealUs); - -protected: - virtual ~MediaClock(); - -private: - status_t getMediaTime_l(int64_t realUs, - int64_t *outMediaUs, - bool allowPastMaxTime); - - Mutex mLock; - - int64_t mAnchorTimeMediaUs; - int64_t mAnchorTimeRealUs; - int64_t mMaxTimeMediaUs; - int64_t mStartingTimeMediaUs; - - float mPlaybackRate; - - DISALLOW_EVIL_CONSTRUCTORS(MediaClock); -}; - -} // namespace android - -#endif // MEDIA_CLOCK_H_ diff --git a/media/libmediaplayerservice/nuplayer/NuPlayer.cpp b/media/libmediaplayerservice/nuplayer/NuPlayer.cpp index 007fbb9..f4d3794 100644 --- a/media/libmediaplayerservice/nuplayer/NuPlayer.cpp +++ b/media/libmediaplayerservice/nuplayer/NuPlayer.cpp @@ -408,7 +408,7 @@ void NuPlayer::onMessageReceived(const sp<AMessage> &msg) { case kWhatGetTrackInfo: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); Parcel* reply; @@ -461,7 +461,7 @@ void NuPlayer::onMessageReceived(const sp<AMessage> &msg) { sp<AMessage> response = new AMessage; response->setInt32("err", err); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); break; @@ -469,7 +469,7 @@ void NuPlayer::onMessageReceived(const sp<AMessage> &msg) { case kWhatSelectTrack: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); size_t trackIndex; diff --git a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderBase.cpp b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderBase.cpp index 0554af9..4636f0a 100644 --- a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderBase.cpp +++ b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderBase.cpp @@ -136,7 +136,7 @@ void NuPlayer::DecoderBase::onMessageReceived(const sp<AMessage> &msg) { case kWhatGetInputBuffers: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); Vector<sp<ABuffer> > *dstBuffers; diff --git a/media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp b/media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp index 337f7b1..4bccfa8 100644 --- a/media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp +++ b/media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp @@ -20,13 +20,12 @@ #include "NuPlayerRenderer.h" -#include "MediaClock.h" - #include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/AMessage.h> #include <media/stagefright/foundation/AUtils.h> #include <media/stagefright/foundation/AWakeLock.h> +#include <media/stagefright/MediaClock.h> #include <media/stagefright/MediaErrors.h> #include <media/stagefright/MetaData.h> #include <media/stagefright/Utils.h> @@ -265,7 +264,7 @@ void NuPlayer::Renderer::onMessageReceived(const sp<AMessage> &msg) { response->setInt32("err", err); response->setInt32("offload", offloadingAudio()); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); @@ -274,7 +273,7 @@ void NuPlayer::Renderer::onMessageReceived(const sp<AMessage> &msg) { case kWhatCloseAudioSink: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); onCloseAudioSink(); @@ -716,6 +715,7 @@ void NuPlayer::Renderer::onNewAudioMediaTime(int64_t mediaTimeUs) { int64_t nowUs = ALooper::GetNowUs(); int64_t nowMediaUs = mediaTimeUs - getPendingAudioPlayoutDurationUs(nowUs); mMediaClock->updateAnchor(nowMediaUs, nowUs, mediaTimeUs); + mAnchorNumFramesWritten = mNumFramesWritten; mAnchorTimeMediaUs = mediaTimeUs; } diff --git a/media/libmediaplayerservice/nuplayer/RTSPSource.cpp b/media/libmediaplayerservice/nuplayer/RTSPSource.cpp index 57d9ab2..5210fc8 100644 --- a/media/libmediaplayerservice/nuplayer/RTSPSource.cpp +++ b/media/libmediaplayerservice/nuplayer/RTSPSource.cpp @@ -311,7 +311,7 @@ void NuPlayer::RTSPSource::performSeek(int64_t seekTimeUs) { void NuPlayer::RTSPSource::onMessageReceived(const sp<AMessage> &msg) { if (msg->what() == kWhatDisconnect) { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); mDisconnectReplyID = replyID; diff --git a/media/libmediaplayerservice/nuplayer/RTSPSource.h b/media/libmediaplayerservice/nuplayer/RTSPSource.h index ac3299a..5f2cf33 100644 --- a/media/libmediaplayerservice/nuplayer/RTSPSource.h +++ b/media/libmediaplayerservice/nuplayer/RTSPSource.h @@ -25,6 +25,7 @@ namespace android { struct ALooper; +struct AReplyToken; struct AnotherPacketSource; struct MyHandler; struct SDPLoader; @@ -96,7 +97,7 @@ private: bool mIsSDP; State mState; status_t mFinalResult; - uint32_t mDisconnectReplyID; + sp<AReplyToken> mDisconnectReplyID; Mutex mBufferingLock; bool mBuffering; diff --git a/media/libstagefright/Android.mk b/media/libstagefright/Android.mk index 6d9bbae..38f2e34 100644 --- a/media/libstagefright/Android.mk +++ b/media/libstagefright/Android.mk @@ -31,6 +31,7 @@ LOCAL_SRC_FILES:= \ MediaAdapter.cpp \ MediaBuffer.cpp \ MediaBufferGroup.cpp \ + MediaClock.cpp \ MediaCodec.cpp \ MediaCodecList.cpp \ MediaCodecSource.cpp \ diff --git a/media/libstagefright/HTTPBase.cpp b/media/libstagefright/HTTPBase.cpp index 0c2ff15..77a652a 100644 --- a/media/libstagefright/HTTPBase.cpp +++ b/media/libstagefright/HTTPBase.cpp @@ -75,7 +75,11 @@ void HTTPBase::addBandwidthMeasurement( bool HTTPBase::estimateBandwidth(int32_t *bandwidth_bps) { Mutex::Autolock autoLock(mLock); - if (mNumBandwidthHistoryItems < 2) { + // Do not do bandwidth estimation if we don't have enough samples, or + // total bytes download are too small (<64K). + // Bandwidth estimation from these samples can often shoot up and cause + // unwanted bw adaption behaviors. + if (mNumBandwidthHistoryItems < 2 || mTotalTransferBytes < 65536) { return false; } diff --git a/media/libmediaplayerservice/nuplayer/MediaClock.cpp b/media/libstagefright/MediaClock.cpp index 9152da1..38db5e4 100644 --- a/media/libmediaplayerservice/nuplayer/MediaClock.cpp +++ b/media/libstagefright/MediaClock.cpp @@ -18,7 +18,7 @@ #define LOG_TAG "MediaClock" #include <utils/Log.h> -#include "MediaClock.h" +#include <media/stagefright/MediaClock.h> #include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/ALooper.h> diff --git a/media/libstagefright/MediaCodec.cpp b/media/libstagefright/MediaCodec.cpp index 6282813..0597f1d 100644 --- a/media/libstagefright/MediaCodec.cpp +++ b/media/libstagefright/MediaCodec.cpp @@ -174,7 +174,7 @@ status_t MediaCodec::PostAndAwaitResponse( } // static -void MediaCodec::PostReplyWithError(int32_t replyID, int32_t err) { +void MediaCodec::PostReplyWithError(const sp<AReplyToken> &replyID, int32_t err) { sp<AMessage> response = new AMessage; response->setInt32("err", err); response->postReply(replyID); @@ -650,7 +650,7 @@ void MediaCodec::cancelPendingDequeueOperations() { } } -bool MediaCodec::handleDequeueInputBuffer(uint32_t replyID, bool newRequest) { +bool MediaCodec::handleDequeueInputBuffer(const sp<AReplyToken> &replyID, bool newRequest) { if (!isExecuting() || (mFlags & kFlagIsAsync) || (newRequest && (mFlags & kFlagDequeueInputPending))) { PostReplyWithError(replyID, INVALID_OPERATION); @@ -674,7 +674,7 @@ bool MediaCodec::handleDequeueInputBuffer(uint32_t replyID, bool newRequest) { return true; } -bool MediaCodec::handleDequeueOutputBuffer(uint32_t replyID, bool newRequest) { +bool MediaCodec::handleDequeueOutputBuffer(const sp<AReplyToken> &replyID, bool newRequest) { sp<AMessage> response = new AMessage; if (!isExecuting() || (mFlags & kFlagIsAsync) @@ -1198,7 +1198,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatInit: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mState != UNINITIALIZED) { @@ -1234,7 +1234,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatSetCallback: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mState == UNINITIALIZED @@ -1266,7 +1266,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatConfigure: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mState != INITIALIZED) { @@ -1323,7 +1323,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatCreateInputSurface: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); // Must be configured, but can't have been started yet. @@ -1339,7 +1339,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatStart: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mState == FLUSHED) { @@ -1365,7 +1365,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { State targetState = (msg->what() == kWhatStop) ? INITIALIZED : UNINITIALIZED; - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (!((mFlags & kFlagIsComponentAllocated) && targetState == UNINITIALIZED) // See 1 @@ -1413,7 +1413,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatDequeueInputBuffer: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mFlags & kFlagIsAsync) { @@ -1474,7 +1474,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatQueueInputBuffer: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (!isExecuting()) { @@ -1493,7 +1493,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatDequeueOutputBuffer: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mFlags & kFlagIsAsync) { @@ -1548,7 +1548,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatReleaseOutputBuffer: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (!isExecuting()) { @@ -1567,7 +1567,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatSignalEndOfInputStream: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (!isExecuting()) { @@ -1585,7 +1585,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatGetBuffers: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (!isExecuting() || (mFlags & kFlagIsAsync)) { @@ -1619,7 +1619,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatFlush: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (!isExecuting()) { @@ -1645,7 +1645,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { sp<AMessage> format = (msg->what() == kWhatGetOutputFormat ? mOutputFormat : mInputFormat); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if ((mState != CONFIGURED && mState != STARTING && @@ -1682,7 +1682,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatGetName: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mComponentName.empty()) { @@ -1698,7 +1698,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) { case kWhatSetParameters: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); sp<AMessage> params; diff --git a/media/libstagefright/MediaCodecSource.cpp b/media/libstagefright/MediaCodecSource.cpp index 1ef4170..b6fa810 100644 --- a/media/libstagefright/MediaCodecSource.cpp +++ b/media/libstagefright/MediaCodecSource.cpp @@ -182,7 +182,7 @@ void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) { sp<AMessage> response = new AMessage; response->setInt32("err", err); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); break; @@ -491,7 +491,7 @@ void MediaCodecSource::signalEOS(status_t err) { if (mStopping && mEncoderReachedEOS) { ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio"); // posting reply to everyone that's waiting - List<uint32_t>::iterator it; + List<sp<AReplyToken>>::iterator it; for (it = mStopReplyIDQueue.begin(); it != mStopReplyIDQueue.end(); it++) { (new AMessage)->postReply(*it); @@ -766,7 +766,7 @@ void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) { } case kWhatStart: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); sp<RefBase> obj; @@ -782,7 +782,7 @@ void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) { { ALOGI("encoder (%s) stopping", mIsVideo ? "video" : "audio"); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); if (mEncoderReachedEOS) { diff --git a/media/libstagefright/foundation/ALooper.cpp b/media/libstagefright/foundation/ALooper.cpp index 617d32b..90b5f68 100644 --- a/media/libstagefright/foundation/ALooper.cpp +++ b/media/libstagefright/foundation/ALooper.cpp @@ -16,6 +16,9 @@ //#define LOG_NDEBUG 0 #define LOG_TAG "ALooper" + +#include <media/stagefright/foundation/ADebug.h> + #include <utils/Log.h> #include <sys/time.h> @@ -220,4 +223,29 @@ bool ALooper::loop() { return true; } +// to be called by AMessage::postAndAwaitResponse only +sp<AReplyToken> ALooper::createReplyToken() { + return new AReplyToken(this); +} + +// to be called by AMessage::postAndAwaitResponse only +status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) { + // return status in case we want to handle an interrupted wait + Mutex::Autolock autoLock(mRepliesLock); + CHECK(replyToken != NULL); + while (!replyToken->retrieveReply(response)) { + mRepliesCondition.wait(mRepliesLock); + } + return OK; +} + +status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) { + Mutex::Autolock autoLock(mRepliesLock); + status_t err = replyToken->setReply(reply); + if (err == OK) { + mRepliesCondition.broadcast(); + } + return err; +} + } // namespace android diff --git a/media/libstagefright/foundation/ALooperRoster.cpp b/media/libstagefright/foundation/ALooperRoster.cpp index c4e6788..473ce1b 100644 --- a/media/libstagefright/foundation/ALooperRoster.cpp +++ b/media/libstagefright/foundation/ALooperRoster.cpp @@ -30,8 +30,7 @@ namespace android { static bool verboseStats = false; ALooperRoster::ALooperRoster() - : mNextHandlerID(1), - mNextReplyID(1) { + : mNextHandlerID(1) { } ALooper::handler_id ALooperRoster::registerHandler( @@ -100,39 +99,6 @@ void ALooperRoster::unregisterStaleHandlers() { } } -status_t ALooperRoster::postAndAwaitResponse( - const sp<AMessage> &msg, sp<AMessage> *response) { - Mutex::Autolock autoLock(mLock); - - uint32_t replyID = mNextReplyID++; - - msg->setInt32("replyID", replyID); - - status_t err = msg->post(0 /* delayUs */); - if (err != OK) { - response->clear(); - return err; - } - - ssize_t index; - while ((index = mReplies.indexOfKey(replyID)) < 0) { - mRepliesCondition.wait(mLock); - } - - *response = mReplies.valueAt(index); - mReplies.removeItemsAt(index); - - return OK; -} - -void ALooperRoster::postReply(uint32_t replyID, const sp<AMessage> &reply) { - Mutex::Autolock autoLock(mLock); - - CHECK(mReplies.indexOfKey(replyID) < 0); - mReplies.add(replyID, reply); - mRepliesCondition.broadcast(); -} - static void makeFourCC(uint32_t fourcc, char *s) { s[0] = (fourcc >> 24) & 0xff; if (s[0]) { diff --git a/media/libstagefright/foundation/AMessage.cpp b/media/libstagefright/foundation/AMessage.cpp index d4add15..e549ff6 100644 --- a/media/libstagefright/foundation/AMessage.cpp +++ b/media/libstagefright/foundation/AMessage.cpp @@ -37,6 +37,17 @@ namespace android { extern ALooperRoster gLooperRoster; +status_t AReplyToken::setReply(const sp<AMessage> &reply) { + if (mReplied) { + ALOGE("trying to post a duplicate reply"); + return -EBUSY; + } + CHECK(mReply == NULL); + mReply = reply; + mReplied = true; + return OK; +} + AMessage::AMessage(void) : mWhat(0), mTarget(0), @@ -355,24 +366,50 @@ status_t AMessage::post(int64_t delayUs) { } status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) { - return gLooperRoster.postAndAwaitResponse(this, response); + sp<ALooper> looper = mLooper.promote(); + if (looper == NULL) { + ALOGW("failed to post message as target looper for handler %d is gone.", mTarget); + return -ENOENT; + } + + sp<AReplyToken> token = looper->createReplyToken(); + if (token == NULL) { + ALOGE("failed to create reply token"); + return -ENOMEM; + } + setObject("replyID", token); + + looper->post(this, 0 /* delayUs */); + return looper->awaitResponse(token, response); } -void AMessage::postReply(uint32_t replyID) { - gLooperRoster.postReply(replyID, this); +status_t AMessage::postReply(const sp<AReplyToken> &replyToken) { + if (replyToken == NULL) { + ALOGW("failed to post reply to a NULL token"); + return -ENOENT; + } + sp<ALooper> looper = replyToken->getLooper(); + if (looper == NULL) { + ALOGW("failed to post reply as target looper is gone."); + return -ENOENT; + } + return looper->postReply(replyToken, this); } -bool AMessage::senderAwaitsResponse(uint32_t *replyID) const { - int32_t tmp; - bool found = findInt32("replyID", &tmp); +bool AMessage::senderAwaitsResponse(sp<AReplyToken> *replyToken) { + sp<RefBase> tmp; + bool found = findObject("replyID", &tmp); if (!found) { return false; } - *replyID = static_cast<uint32_t>(tmp); + *replyToken = static_cast<AReplyToken *>(tmp.get()); + tmp.clear(); + setObject("replyID", tmp); + // TODO: delete Object instead of setting it to NULL - return true; + return *replyToken != NULL; } sp<AMessage> AMessage::dup() const { diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index dcac765..a8f60a8 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -49,8 +49,13 @@ namespace android { +// static // Number of recently-read bytes to use for bandwidth estimation const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; +// High water mark to start up switch or report prepared) +const int64_t LiveSession::kHighWaterMark = 8000000ll; +const int64_t LiveSession::kMidWaterMark = 5000000ll; +const int64_t LiveSession::kLowWaterMark = 3000000ll; LiveSession::LiveSession( const sp<AMessage> ¬ify, uint32_t flags, @@ -75,14 +80,14 @@ LiveSession::LiveSession( mSeekReplyID(0), mFirstTimeUsValid(false), mFirstTimeUs(0), - mLastSeekTimeUs(0) { + mLastSeekTimeUs(0), + mPollBufferingGeneration(0) { mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); for (size_t i = 0; i < kMaxStreams; ++i) { - mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mBuffering[i] = false; @@ -97,6 +102,9 @@ LiveSession::LiveSession( } LiveSession::~LiveSession() { + if (mFetcherLooper != NULL) { + mFetcherLooper->stop(); + } } sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { @@ -125,24 +133,7 @@ status_t LiveSession::dequeueAccessUnit( return -EWOULDBLOCK; } - status_t finalResult; - sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); - if (discontinuityQueue->hasBufferAvailable(&finalResult)) { - discontinuityQueue->dequeueAccessUnit(accessUnit); - // seeking, track switching - sp<AMessage> extra; - int64_t timeUs; - if ((*accessUnit)->meta()->findMessage("extra", &extra) - && extra != NULL - && extra->findInt64("timeUs", &timeUs)) { - // seeking only - mLastSeekTimeUs = timeUs; - mDiscontinuityOffsetTimesUs.clear(); - mDiscontinuityAbsStartTimesUs.clear(); - } - return INFO_DISCONTINUITY; - } - + status_t finalResult = OK; sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); ssize_t idx = typeToIndex(stream); @@ -172,7 +163,7 @@ status_t LiveSession::dequeueAccessUnit( if (mBuffering[idx]) { if (mSwitchInProgress || packetSource->isFinished(0) - || packetSource->getEstimatedDurationUs() > targetDurationUs) { + || packetSource->hasBufferAvailable(&finalResult)) { mBuffering[idx] = false; } } @@ -402,7 +393,7 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { case kWhatSeek: { - uint32_t seekReplyID; + sp<AReplyToken> seekReplyID; CHECK(msg->senderAwaitsResponse(&seekReplyID)); mSeekReplyID = seekReplyID; mSeekReply = new AMessage; @@ -429,11 +420,16 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { if (what == PlaylistFetcher::kWhatStopped) { AString uri; CHECK(msg->findString("uri", &uri)); - if (mFetcherInfos.removeItem(uri) < 0) { + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index < 0) { // ignore duplicated kWhatStopped messages. break; } + mFetcherLooper->unregisterHandler( + mFetcherInfos[index].mFetcher->id()); + mFetcherInfos.removeItemsAt(index); + if (mSwitchInProgress) { tryToFinishBandwidthSwitch(); } @@ -443,14 +439,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { CHECK_GT(mContinuationCounter, 0); if (--mContinuationCounter == 0) { mContinuation->post(); - - if (mSeekReplyID != 0) { - CHECK(mSeekReply != NULL); - mSeekReply->setInt32("err", OK); - mSeekReply->postReply(mSeekReplyID); - mSeekReplyID = 0; - mSeekReply.clear(); - } } } break; @@ -464,8 +452,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { int64_t durationUs; CHECK(msg->findInt64("durationUs", &durationUs)); - FetcherInfo *info = &mFetcherInfos.editValueFor(uri); - info->mDurationUs = durationUs; + ssize_t index = mFetcherInfos.indexOfKey(uri); + if (index >= 0) { + FetcherInfo *info = &mFetcherInfos.editValueFor(uri); + info->mDurationUs = durationUs; + } break; } @@ -513,34 +504,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } - case PlaylistFetcher::kWhatTemporarilyDoneFetching: - { - AString uri; - CHECK(msg->findString("uri", &uri)); - - if (mFetcherInfos.indexOfKey(uri) < 0) { - ALOGE("couldn't find uri"); - break; - } - FetcherInfo *info = &mFetcherInfos.editValueFor(uri); - info->mIsPrepared = true; - - if (mInPreparationPhase) { - bool allFetchersPrepared = true; - for (size_t i = 0; i < mFetcherInfos.size(); ++i) { - if (!mFetcherInfos.valueAt(i).mIsPrepared) { - allFetchersPrepared = false; - break; - } - } - - if (allFetchersPrepared) { - postPrepared(OK); - } - } - break; - } - case PlaylistFetcher::kWhatStartedAt: { int32_t switchGeneration; @@ -569,19 +532,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } - case kWhatCheckBandwidth: - { - int32_t generation; - CHECK(msg->findInt32("generation", &generation)); - - if (generation != mCheckBandwidthGeneration) { - break; - } - - onCheckBandwidth(msg); - break; - } - case kWhatChangeConfiguration: { onChangeConfiguration(msg); @@ -612,15 +562,13 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } - case kWhatCheckSwitchDown: + case kWhatPollBuffering: { - onCheckSwitchDown(); - break; - } - - case kWhatSwitchDown: - { - onSwitchDown(); + int32_t generation; + CHECK(msg->findInt32("generation", &generation)); + if (generation == mPollBufferingGeneration) { + onPollBuffering(); + } break; } @@ -691,6 +639,14 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { return; } + // create looper for fetchers + if (mFetcherLooper == NULL) { + mFetcherLooper = new ALooper(); + + mFetcherLooper->setName("Fetcher"); + mFetcherLooper->start(false, false); + } + // We trust the content provider to make a reasonable choice of preferred // initial bandwidth by listing it first in the variant playlist. // At startup we really don't have a good estimate on the available @@ -739,19 +695,20 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { mPlaylist->pickRandomMediaItems(); changeConfiguration( 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); + + schedulePollBuffering(); } void LiveSession::finishDisconnect() { // No reconfiguration is currently pending, make sure none will trigger // during disconnection either. - cancelCheckBandwidthEvent(); // Protect mPacketSources from a swapPacketSource race condition through disconnect. // (finishDisconnect, onFinishDisconnect2) cancelBandwidthSwitch(); - // cancel switch down monitor - mSwitchDownMonitor.clear(); + // cancel buffer polling + cancelPollBuffering(); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); @@ -799,7 +756,7 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { info.mDurationUs = -1ll; info.mIsPrepared = false; info.mToBeRemoved = false; - looper()->registerHandler(info.mFetcher); + mFetcherLooper->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); @@ -1201,19 +1158,6 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const { } } -bool LiveSession::canSwitchUp() { - // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. - status_t err = OK; - for (size_t i = 0; i < mPacketSources.size(); ++i) { - sp<AnotherPacketSource> source = mPacketSources.valueAt(i); - int64_t dur = source->getBufferedDurationUs(&err); - if (err == OK && dur > 10000000) { - return true; - } - } - return false; -} - void LiveSession::changeConfiguration( int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. @@ -1296,14 +1240,6 @@ void LiveSession::changeConfiguration( if (mContinuationCounter == 0) { msg->post(); - - if (mSeekReplyID != 0) { - CHECK(mSeekReply != NULL); - mSeekReply->setInt32("err", OK); - mSeekReply->postReply(mSeekReplyID); - mSeekReplyID = 0; - mSeekReply.clear(); - } } } @@ -1323,6 +1259,30 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { // All fetchers are either suspended or have been removed now. + // If we're seeking, clear all packet sources before we report + // seek complete, to prevent decoder from pulling stale data. + int64_t timeUs; + CHECK(msg->findInt64("timeUs", &timeUs)); + + if (timeUs >= 0) { + mLastSeekTimeUs = timeUs; + + for (size_t i = 0; i < mPacketSources.size(); i++) { + mPacketSources.editValueAt(i)->clear(); + } + + mDiscontinuityOffsetTimesUs.clear(); + mDiscontinuityAbsStartTimesUs.clear(); + + if (mSeekReplyID != 0) { + CHECK(mSeekReply != NULL); + mSeekReply->setInt32("err", OK); + mSeekReply->postReply(mSeekReplyID); + mSeekReplyID = 0; + mSeekReply.clear(); + } + } + uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); @@ -1428,19 +1388,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { for (size_t j = 0; j < kMaxStreams; ++j) { if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); - - if (j != kSubtitleIndex) { - ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); - sp<AnotherPacketSource> discontinuityQueue; - discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); - discontinuityQueue->queueDiscontinuity( - ATSParser::DISCONTINUITY_NONE, - NULL, - true); - } } } - FetcherInfo &info = mFetcherInfos.editValueAt(i); if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL || sources[kSubtitleIndex] != NULL) { @@ -1486,15 +1435,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (timeUs >= 0) { - sources[j]->clear(); startTimeUs = timeUs; - - sp<AnotherPacketSource> discontinuityQueue; - sp<AMessage> extra = new AMessage; - extra->setInt64("timeUs", timeUs); - discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); - discontinuityQueue->queueDiscontinuity( - ATSParser::DISCONTINUITY_TIME, extra, true); } else { int32_t type; sp<AMessage> meta; @@ -1532,9 +1473,10 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { if (j == kSubtitleIndex) { break; } - sp<AnotherPacketSource> discontinuityQueue; - discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); - discontinuityQueue->queueDiscontinuity( + + ALOGV("stream[%d]: queue format change", j); + + sources[j]->queueDiscontinuity( ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); } else { // adapting, queue discontinuities after resume @@ -1564,9 +1506,6 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { // All fetchers have now been started, the configuration change // has completed. - cancelCheckBandwidthEvent(); - scheduleCheckBandwidthEvent(); - ALOGV("XXX configuration change completed."); mReconfigurationInProgress = false; if (switching) { @@ -1623,47 +1562,35 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) { tryToFinishBandwidthSwitch(); } -void LiveSession::onCheckSwitchDown() { - if (mSwitchDownMonitor == NULL) { - return; - } - - if (mSwitchInProgress || mReconfigurationInProgress) { - ALOGV("Switch/Reconfig in progress, defer switch down"); - mSwitchDownMonitor->post(1000000ll); - return; - } +void LiveSession::schedulePollBuffering() { + sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); + msg->setInt32("generation", mPollBufferingGeneration); + msg->post(1000000ll); +} - for (size_t i = 0; i < kMaxStreams; ++i) { - int32_t targetDuration; - sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); - sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); +void LiveSession::cancelPollBuffering() { + ++mPollBufferingGeneration; +} - if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { - int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); - int64_t targetDurationUs = targetDuration * 1000000ll; +void LiveSession::onPollBuffering() { + ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " + "mInPreparationPhase %d, mStreamMask 0x%x", + mSwitchInProgress, mReconfigurationInProgress, + mInPreparationPhase, mStreamMask); - if (bufferedDurationUs < targetDurationUs / 3) { - (new AMessage(kWhatSwitchDown, this))->post(); - break; - } + bool low, mid, high; + if (checkBuffering(low, mid, high)) { + if (mInPreparationPhase && mid) { + postPrepared(OK); } - } - - mSwitchDownMonitor->post(1000000ll); -} - -void LiveSession::onSwitchDown() { - if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { - return; - } - ssize_t bandwidthIndex = getBandwidthIndex(); - if (bandwidthIndex < mCurBandwidthIndex) { - changeConfiguration(-1, bandwidthIndex, false); - return; + // don't switch before we report prepared + if (!mInPreparationPhase && (low || high)) { + switchBandwidthIfNeeded(high); + } } + schedulePollBuffering(); } // Mark switch done when: @@ -1688,16 +1615,6 @@ void LiveSession::tryToFinishBandwidthSwitch() { } } -void LiveSession::scheduleCheckBandwidthEvent() { - sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, this); - msg->setInt32("generation", mCheckBandwidthGeneration); - msg->post(10000000ll); -} - -void LiveSession::cancelCheckBandwidthEvent() { - ++mCheckBandwidthGeneration; -} - void LiveSession::cancelBandwidthSwitch() { Mutex::Autolock lock(mSwapMutex); mSwitchGeneration++; @@ -1727,33 +1644,69 @@ void LiveSession::cancelBandwidthSwitch() { } } -bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { - if (mReconfigurationInProgress || mSwitchInProgress) { +bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { + low = mid = high = false; + + if (mSwitchInProgress || mReconfigurationInProgress) { + ALOGV("Switch/Reconfig in progress, defer buffer polling"); return false; } - if (mCurBandwidthIndex < 0) { - return true; + // TODO: Fine tune low/high mark. + // We also need to pause playback if buffering is too low. + // Currently during underflow, we depend on decoder to starve + // to pause, but A/V could have different buffering left, + // they're not paused together. + // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE + + // Switch down if any of the fetchers are below low mark; + // Switch up if all of the fetchers are over high mark. + size_t activeCount, lowCount, midCount, highCount; + activeCount = lowCount = midCount = highCount = 0; + for (size_t i = 0; i < mPacketSources.size(); ++i) { + // we don't check subtitles for buffering level + if (!(mStreamMask & mPacketSources.keyAt(i) + & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { + continue; + } + // ignore streams that never had any packet queued. + // (it's possible that the variant only has audio or video) + sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); + if (meta == NULL) { + continue; + } + + ++activeCount; + int64_t bufferedDurationUs = + mPacketSources[i]->getEstimatedDurationUs(); + ALOGV("source[%d]: buffered %lld us", i, bufferedDurationUs); + if (bufferedDurationUs < kLowWaterMark) { + ++lowCount; + break; + } else if (bufferedDurationUs > kHighWaterMark) { + ++midCount; + ++highCount; + } else if (bufferedDurationUs > kMidWaterMark) { + ++midCount; + } } - if (bandwidthIndex == (size_t)mCurBandwidthIndex) { - return false; - } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { - return canSwitchUp(); - } else { + if (activeCount > 0) { + high = (highCount == activeCount); + mid = (midCount == activeCount); + low = (lowCount > 0); return true; } + + return false; } -void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { - size_t bandwidthIndex = getBandwidthIndex(); - if (canSwitchBandwidthTo(bandwidthIndex)) { - changeConfiguration(-1ll /* timeUs */, bandwidthIndex); - } else { - // Come back and check again 10 seconds later in case there is nothing to do now. - // If we DO change configuration, once that completes it'll schedule a new - // check bandwidth event with an incremented mCheckBandwidthGeneration. - msg->post(10000000ll); +void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) { + ssize_t bandwidthIndex = getBandwidthIndex(); + + if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) + || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) { + changeConfiguration(-1, bandwidthIndex, false); } } @@ -1771,10 +1724,8 @@ void LiveSession::postPrepared(status_t err) { notify->post(); mInPreparationPhase = false; - - mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, this); - mSwitchDownMonitor->post(); } + } // namespace android diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h index 2d3a25a..3b0a9a4 100644 --- a/media/libstagefright/httplive/LiveSession.h +++ b/media/libstagefright/httplive/LiveSession.h @@ -26,6 +26,7 @@ namespace android { struct ABuffer; +struct AReplyToken; struct AnotherPacketSource; struct DataSource; struct HTTPBase; @@ -39,10 +40,6 @@ struct LiveSession : public AHandler { // Don't log any URLs. kFlagIncognito = 1, }; - LiveSession( - const sp<AMessage> ¬ify, - uint32_t flags, - const sp<IMediaHTTPService> &httpService); enum StreamIndex { kAudioIndex = 0, @@ -56,6 +53,12 @@ struct LiveSession : public AHandler { STREAMTYPE_VIDEO = 1 << kVideoIndex, STREAMTYPE_SUBTITLES = 1 << kSubtitleIndex, }; + + LiveSession( + const sp<AMessage> ¬ify, + uint32_t flags, + const sp<IMediaHTTPService> &httpService); + status_t dequeueAccessUnit(StreamType stream, sp<ABuffer> *accessUnit); status_t getStreamFormat(StreamType stream, sp<AMessage> *format); @@ -109,11 +112,13 @@ private: kWhatChangeConfiguration3 = 'chC3', kWhatFinishDisconnect2 = 'fin2', kWhatSwapped = 'swap', - kWhatCheckSwitchDown = 'ckSD', - kWhatSwitchDown = 'sDwn', + kWhatPollBuffering = 'poll', }; static const size_t kBandwidthHistoryBytes; + static const int64_t kHighWaterMark; + static const int64_t kMidWaterMark; + static const int64_t kLowWaterMark; struct BandwidthItem { size_t mPlaylistIndex; @@ -168,6 +173,7 @@ private: sp<M3UParser> mPlaylist; + sp<ALooper> mFetcherLooper; KeyedVector<AString, FetcherInfo> mFetcherInfos; uint32_t mStreamMask; @@ -180,7 +186,6 @@ private: // we use this to track reconfiguration progress. uint32_t mSwapMask; - KeyedVector<StreamType, sp<AnotherPacketSource> > mDiscontinuities; KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources; // A second set of packet sources that buffer content for the variant we're switching to. KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2; @@ -203,16 +208,17 @@ private: bool mReconfigurationInProgress; bool mSwitchInProgress; - uint32_t mDisconnectReplyID; - uint32_t mSeekReplyID; + sp<AReplyToken> mDisconnectReplyID; + sp<AReplyToken> mSeekReplyID; bool mFirstTimeUsValid; int64_t mFirstTimeUs; int64_t mLastSeekTimeUs; - sp<AMessage> mSwitchDownMonitor; KeyedVector<size_t, int64_t> mDiscontinuityAbsStartTimesUs; KeyedVector<size_t, int64_t> mDiscontinuityOffsetTimesUs; + int32_t mPollBufferingGeneration; + sp<PlaylistFetcher> addFetcher(const char *uri); void onConnect(const sp<AMessage> &msg); @@ -256,27 +262,24 @@ private: void onChangeConfiguration2(const sp<AMessage> &msg); void onChangeConfiguration3(const sp<AMessage> &msg); void onSwapped(const sp<AMessage> &msg); - void onCheckSwitchDown(); - void onSwitchDown(); void tryToFinishBandwidthSwitch(); - void scheduleCheckBandwidthEvent(); - void cancelCheckBandwidthEvent(); - // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources // from being swapped out on stale discontinuities while manipulating // mPacketSources/mPacketSources2. void cancelBandwidthSwitch(); - bool canSwitchBandwidthTo(size_t bandwidthIndex); - void onCheckBandwidth(const sp<AMessage> &msg); + void schedulePollBuffering(); + void cancelPollBuffering(); + void onPollBuffering(); + bool checkBuffering(bool &low, bool &mid, bool &high); + void switchBandwidthIfNeeded(bool canSwitchUp); void finishDisconnect(); void postPrepared(status_t err); void swapPacketSource(StreamType stream); - bool canSwitchUp(); DISALLOW_EVIL_CONSTRUCTORS(LiveSession); }; diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp index 4ccbf76..3710686 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.cpp +++ b/media/libstagefright/httplive/PlaylistFetcher.cpp @@ -49,6 +49,7 @@ namespace android { // static const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; +const int64_t PlaylistFetcher::kFetcherResumeThreshold = 100000ll; // LCM of 188 (size of a TS packet) & 1k works well const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024; const int32_t PlaylistFetcher::kNumSkipFrames = 5; @@ -535,12 +536,19 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { sp<AMessage> params; CHECK(msg->findMessage("params", ¶ms)); - bool stop = false; + size_t stopCount = 0; for (size_t i = 0; i < mPacketSources.size(); i++) { sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); const char *stopKey; int streamType = mPacketSources.keyAt(i); + + if (streamType == LiveSession::STREAMTYPE_SUBTITLES) { + // the subtitle track can always be stopped + ++stopCount; + continue; + } + switch (streamType) { case LiveSession::STREAMTYPE_VIDEO: stopKey = "timeUsVideo"; @@ -550,15 +558,11 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { stopKey = "timeUsAudio"; break; - case LiveSession::STREAMTYPE_SUBTITLES: - stopKey = "timeUsSubtitle"; - break; - default: TRESPASS(); } - // Don't resume if we would stop within a resume threshold. + // check if this stream has too little data left to be resumed int32_t discontinuitySeq; int64_t latestTimeUs = 0, stopTimeUs = 0; sp<AMessage> latestMeta = packetSource->getLatestEnqueuedMeta(); @@ -567,12 +571,13 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { && discontinuitySeq == mDiscontinuitySeq && latestMeta->findInt64("timeUs", &latestTimeUs) && params->findInt64(stopKey, &stopTimeUs) - && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) { - stop = true; + && stopTimeUs - latestTimeUs < kFetcherResumeThreshold) { + ++stopCount; } } - if (stop) { + // Don't resume if all streams are within a resume threshold + if (stopCount == mPacketSources.size()) { for (size_t i = 0; i < mPacketSources.size(); i++) { mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); } @@ -581,7 +586,7 @@ status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { } mStopParams = params; - postMonitorQueue(); + onDownloadNext(); return OK; } @@ -660,9 +665,6 @@ void PlaylistFetcher::onMonitorQueue() { ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "", bufferedDurationUs, targetDurationUs); - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatTemporarilyDoneFetching); - msg->post(); } if (finalResult == OK && downloadMore) { @@ -676,11 +678,6 @@ void PlaylistFetcher::onMonitorQueue() { msg->post(1000l); } else { // Nothing to do yet, try again in a second. - - sp<AMessage> msg = mNotify->dup(); - msg->setInt32("what", kWhatTemporarilyDoneFetching); - msg->post(); - int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "", delayUs, bufferedDurationUs, durationToBufferUs); @@ -1687,33 +1684,4 @@ void PlaylistFetcher::updateDuration() { msg->post(); } -int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) { - int64_t durationUs; - if (msg->findInt64("durationUs", &durationUs) && durationUs > 0) { - return kNumSkipFrames * durationUs; - } - - sp<RefBase> obj; - msg->findObject("format", &obj); - MetaData *format = static_cast<MetaData *>(obj.get()); - - const char *mime; - CHECK(format->findCString(kKeyMIMEType, &mime)); - bool audio = !strncasecmp(mime, "audio/", 6); - if (audio) { - // Assumes 1000 samples per frame. - int32_t sampleRate; - CHECK(format->findInt32(kKeySampleRate, &sampleRate)); - return kNumSkipFrames /* frames */ * 1000 /* samples */ - * (1000000 / sampleRate) /* sample duration (us) */; - } else { - int32_t frameRate; - if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) { - return kNumSkipFrames * (1000000 / frameRate); - } - } - - return 500000ll; -} - } // namespace android diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h index 4e15f85..2f11949 100644 --- a/media/libstagefright/httplive/PlaylistFetcher.h +++ b/media/libstagefright/httplive/PlaylistFetcher.h @@ -36,6 +36,7 @@ class String8; struct PlaylistFetcher : public AHandler { static const int64_t kMinBufferedDurationUs; static const int32_t kDownloadBlockSize; + static const int64_t kFetcherResumeThreshold; enum { kWhatStarted, @@ -43,7 +44,6 @@ struct PlaylistFetcher : public AHandler { kWhatStopped, kWhatError, kWhatDurationUpdate, - kWhatTemporarilyDoneFetching, kWhatPrepared, kWhatPreparationFailed, kWhatStartedAt, @@ -212,10 +212,6 @@ private: void updateDuration(); - // Before resuming a fetcher in onResume, check the remaining duration is longer than that - // returned by resumeThreshold. - int64_t resumeThreshold(const sp<AMessage> &msg); - DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher); }; diff --git a/media/libstagefright/mpeg2ts/ATSParser.cpp b/media/libstagefright/mpeg2ts/ATSParser.cpp index 934e2e5..6786506 100644 --- a/media/libstagefright/mpeg2ts/ATSParser.cpp +++ b/media/libstagefright/mpeg2ts/ATSParser.cpp @@ -35,6 +35,7 @@ #include <media/stagefright/Utils.h> #include <media/IStreamSource.h> #include <utils/KeyedVector.h> +#include <utils/Vector.h> #include <inttypes.h> @@ -86,6 +87,11 @@ struct ATSParser::Program : public RefBase { } private: + struct StreamInfo { + unsigned mType; + unsigned mPID; + }; + ATSParser *mParser; unsigned mProgramNumber; unsigned mProgramMapPID; @@ -96,6 +102,7 @@ private: status_t parseProgramMap(ABitReader *br); int64_t recoverPTS(uint64_t PTS_33bit); + bool switchPIDs(const Vector<StreamInfo> &infos); DISALLOW_EVIL_CONSTRUCTORS(Program); }; @@ -185,7 +192,7 @@ ATSParser::Program::Program( mProgramMapPID(programMapPID), mFirstPTSValid(false), mFirstPTS(0), - mLastRecoveredPTS(0) { + mLastRecoveredPTS(-1ll) { ALOGV("new program number %u", programNumber); } @@ -240,10 +247,71 @@ void ATSParser::Program::signalEOS(status_t finalResult) { } } -struct StreamInfo { - unsigned mType; - unsigned mPID; -}; +bool ATSParser::Program::switchPIDs(const Vector<StreamInfo> &infos) { + bool success = false; + + if (mStreams.size() == infos.size()) { + // build type->PIDs map for old and new mapping + size_t i; + KeyedVector<int32_t, Vector<int32_t> > oldType2PIDs, newType2PIDs; + for (i = 0; i < mStreams.size(); ++i) { + ssize_t index = oldType2PIDs.indexOfKey(mStreams[i]->type()); + if (index < 0) { + oldType2PIDs.add(mStreams[i]->type(), Vector<int32_t>()); + } + oldType2PIDs.editValueFor(mStreams[i]->type()).push_back(mStreams[i]->pid()); + } + for (i = 0; i < infos.size(); ++i) { + ssize_t index = newType2PIDs.indexOfKey(infos[i].mType); + if (index < 0) { + newType2PIDs.add(infos[i].mType, Vector<int32_t>()); + } + newType2PIDs.editValueFor(infos[i].mType).push_back(infos[i].mPID); + } + + // we can recover if the number of streams for each type hasn't changed + if (oldType2PIDs.size() == newType2PIDs.size()) { + success = true; + for (i = 0; i < oldType2PIDs.size(); ++i) { + // KeyedVector is sorted, we just compare key and size of each index + if (oldType2PIDs.keyAt(i) != newType2PIDs.keyAt(i) + || oldType2PIDs[i].size() != newType2PIDs[i].size()) { + success = false; + break; + } + } + } + + if (success) { + // save current streams to temp + KeyedVector<int32_t, sp<Stream> > temp; + for (i = 0; i < mStreams.size(); ++i) { + temp.add(mStreams.keyAt(i), mStreams.editValueAt(i)); + } + + mStreams.clear(); + for (i = 0; i < temp.size(); ++i) { + // The two checks below shouldn't happen, + // we already checked above the stream count matches + ssize_t index = newType2PIDs.indexOfKey(temp[i]->type()); + CHECK(index >= 0); + Vector<int32_t> &newPIDs = newType2PIDs.editValueAt(index); + CHECK(newPIDs.size() > 0); + + // get the next PID for temp[i]->type() in the new PID map + Vector<int32_t>::iterator it = newPIDs.begin(); + + // change the PID of the stream, and add it back + temp.editValueAt(i)->setPID(*it); + mStreams.add(temp[i]->pid(), temp.editValueAt(i)); + + // removed the used PID + newPIDs.erase(it); + } + } + } + return success; +} status_t ATSParser::Program::parseProgramMap(ABitReader *br) { unsigned table_id = br->getBits(8); @@ -372,39 +440,8 @@ status_t ATSParser::Program::parseProgramMap(ABitReader *br) { } #endif - // The only case we can recover from is if we have two streams - // and they switched PIDs. - - bool success = false; - - if (mStreams.size() == 2 && infos.size() == 2) { - const StreamInfo &info1 = infos.itemAt(0); - const StreamInfo &info2 = infos.itemAt(1); - - sp<Stream> s1 = mStreams.editValueAt(0); - sp<Stream> s2 = mStreams.editValueAt(1); - - bool caseA = - info1.mPID == s1->pid() && info1.mType == s2->type() - && info2.mPID == s2->pid() && info2.mType == s1->type(); - - bool caseB = - info1.mPID == s2->pid() && info1.mType == s1->type() - && info2.mPID == s1->pid() && info2.mType == s2->type(); - - if (caseA || caseB) { - unsigned pid1 = s1->pid(); - unsigned pid2 = s2->pid(); - s1->setPID(pid2); - s2->setPID(pid1); - - mStreams.clear(); - mStreams.add(s1->pid(), s1); - mStreams.add(s2->pid(), s2); - - success = true; - } - } + // we can recover if number of streams for each type remain the same + bool success = switchPIDs(infos); if (!success) { ALOGI("Stream PIDs changed and we cannot recover."); @@ -433,14 +470,25 @@ int64_t ATSParser::Program::recoverPTS(uint64_t PTS_33bit) { // reasonable amount of time. To handle the wrap-around, use fancy math // to get an extended PTS that is within [-0xffffffff, 0xffffffff] // of the latest recovered PTS. - mLastRecoveredPTS = static_cast<int64_t>( - ((mLastRecoveredPTS - PTS_33bit + 0x100000000ll) - & 0xfffffffe00000000ull) | PTS_33bit); - - // We start from 0, but recovered PTS could be slightly below 0. - // Clamp it to 0 as rest of the pipeline doesn't take negative pts. - // (eg. video is read first and starts at 0, but audio starts at 0xfffffff0) - return mLastRecoveredPTS < 0ll ? 0ll : mLastRecoveredPTS; + if (mLastRecoveredPTS < 0ll) { + // Use the original 33bit number for 1st frame, the reason is that + // if 1st frame wraps to negative that's far away from 0, we could + // never start. Only start wrapping around from 2nd frame. + mLastRecoveredPTS = static_cast<int64_t>(PTS_33bit); + } else { + mLastRecoveredPTS = static_cast<int64_t>( + ((mLastRecoveredPTS - PTS_33bit + 0x100000000ll) + & 0xfffffffe00000000ull) | PTS_33bit); + // We start from 0, but recovered PTS could be slightly below 0. + // Clamp it to 0 as rest of the pipeline doesn't take negative pts. + // (eg. video is read first and starts at 0, but audio starts at 0xfffffff0) + if (mLastRecoveredPTS < 0ll) { + ALOGI("Clamping negative recovered PTS (%" PRId64 ") to 0", mLastRecoveredPTS); + mLastRecoveredPTS = 0ll; + } + } + + return mLastRecoveredPTS; } sp<MediaSource> ATSParser::Program::getSource(SourceType type) { @@ -1118,7 +1166,8 @@ status_t ATSParser::parsePID( if (payload_unit_start_indicator) { if (!section->isEmpty()) { - return ERROR_UNSUPPORTED; + ALOGW("parsePID encounters payload_unit_start_indicator when section is not empty"); + section->clear(); } unsigned skip = br->getBits(8); diff --git a/media/libstagefright/wifi-display/source/MediaPuller.cpp b/media/libstagefright/wifi-display/source/MediaPuller.cpp index 927a53c..ce07a4e 100644 --- a/media/libstagefright/wifi-display/source/MediaPuller.cpp +++ b/media/libstagefright/wifi-display/source/MediaPuller.cpp @@ -105,7 +105,7 @@ void MediaPuller::onMessageReceived(const sp<AMessage> &msg) { sp<AMessage> response = new AMessage; response->setInt32("err", err); - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); response->postReply(replyID); break; diff --git a/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp b/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp index 210914e..14d0951 100644 --- a/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp +++ b/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp @@ -57,7 +57,7 @@ WifiDisplaySource::WifiDisplaySource( mNetSession(netSession), mClient(client), mSessionID(0), - mStopReplyID(0), + mStopReplyID(NULL), mChosenRTPPort(-1), mUsingPCMAudio(false), mClientSessionID(0), @@ -138,7 +138,7 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) { switch (msg->what()) { case kWhatStart: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); AString iface; @@ -325,7 +325,7 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) { case kWhatPause: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); status_t err = OK; @@ -345,7 +345,7 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) { case kWhatResume: { - uint32_t replyID; + sp<AReplyToken> replyID; CHECK(msg->senderAwaitsResponse(&replyID)); status_t err = OK; @@ -492,7 +492,7 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) { if (mState == AWAITING_CLIENT_TEARDOWN) { ALOGI("TEARDOWN trigger timed out, forcing disconnection."); - CHECK_NE(mStopReplyID, 0); + CHECK(mStopReplyID != NULL); finishStop(); break; } @@ -1470,7 +1470,7 @@ status_t WifiDisplaySource::onTeardownRequest( mNetSession->sendRequest(sessionID, response.c_str()); if (mState == AWAITING_CLIENT_TEARDOWN) { - CHECK_NE(mStopReplyID, 0); + CHECK(mStopReplyID != NULL); finishStop(); } else { mClient->onDisplayError(IRemoteDisplayClient::kDisplayErrorUnknown); diff --git a/media/libstagefright/wifi-display/source/WifiDisplaySource.h b/media/libstagefright/wifi-display/source/WifiDisplaySource.h index 750265f..0f779e4 100644 --- a/media/libstagefright/wifi-display/source/WifiDisplaySource.h +++ b/media/libstagefright/wifi-display/source/WifiDisplaySource.h @@ -27,6 +27,7 @@ namespace android { +struct AReplyToken; struct IHDCP; struct IRemoteDisplayClient; struct ParsedMessage; @@ -121,7 +122,7 @@ private: struct in_addr mInterfaceAddr; int32_t mSessionID; - uint32_t mStopReplyID; + sp<AReplyToken> mStopReplyID; AString mWfdClientRtpPorts; int32_t mChosenRTPPort; // extracted from "wfd_client_rtp_ports" diff --git a/media/ndk/NdkMediaCodec.cpp b/media/ndk/NdkMediaCodec.cpp index 46757bc..80c1c2f 100644 --- a/media/ndk/NdkMediaCodec.cpp +++ b/media/ndk/NdkMediaCodec.cpp @@ -116,7 +116,7 @@ void CodecHandler::onMessageReceived(const sp<AMessage> &msg) { case kWhatStopActivityNotifications: { - uint32_t replyID; + sp<AReplyToken> replyID; msg->senderAwaitsResponse(&replyID); mCodec->mGeneration++; |