summaryrefslogtreecommitdiffstats
path: root/media/libstagefright
diff options
context:
space:
mode:
authorBill Yi <byi@google.com>2014-04-29 11:34:16 -0700
committerBill Yi <byi@google.com>2014-04-29 11:34:16 -0700
commit1a7be1ec9c769203e6c8e26378de0ab8e2ad493d (patch)
tree68773b199cc4adcb96124270863244477570c78e /media/libstagefright
parentf131f87369ec06fc27fc1f14ea72f0ca1a066509 (diff)
parent91820d46b5f3065c2fded3cdf65d305715b33bb1 (diff)
downloadframeworks_av-1a7be1ec9c769203e6c8e26378de0ab8e2ad493d.zip
frameworks_av-1a7be1ec9c769203e6c8e26378de0ab8e2ad493d.tar.gz
frameworks_av-1a7be1ec9c769203e6c8e26378de0ab8e2ad493d.tar.bz2
Merge commit '91820d46b5f3065c2fded3cdf65d305715b33bb1' into HEAD
Diffstat (limited to 'media/libstagefright')
-rw-r--r--media/libstagefright/ACodec.cpp111
-rw-r--r--media/libstagefright/MPEG4Writer.cpp8
-rw-r--r--media/libstagefright/OMXCodec.cpp40
-rw-r--r--media/libstagefright/chromium_http/Android.mk2
-rw-r--r--media/libstagefright/chromium_http/support.cpp104
-rw-r--r--media/libstagefright/httplive/LiveSession.cpp581
-rw-r--r--media/libstagefright/httplive/LiveSession.h84
-rw-r--r--media/libstagefright/httplive/M3UParser.cpp173
-rw-r--r--media/libstagefright/httplive/M3UParser.h10
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.cpp778
-rw-r--r--media/libstagefright/httplive/PlaylistFetcher.h55
-rw-r--r--media/libstagefright/matroska/MatroskaExtractor.cpp80
-rw-r--r--media/libstagefright/mpeg2ts/ATSParser.h5
-rw-r--r--media/libstagefright/mpeg2ts/AnotherPacketSource.cpp72
-rw-r--r--media/libstagefright/mpeg2ts/AnotherPacketSource.h3
-rw-r--r--media/libstagefright/omx/GraphicBufferSource.cpp106
-rw-r--r--media/libstagefright/omx/GraphicBufferSource.h25
-rw-r--r--media/libstagefright/omx/OMXNodeInstance.cpp26
-rw-r--r--media/libstagefright/wifi-display/source/TSPacketizer.cpp2
19 files changed, 1781 insertions, 484 deletions
diff --git a/media/libstagefright/ACodec.cpp b/media/libstagefright/ACodec.cpp
index dfe6c56..2d04bbb 100644
--- a/media/libstagefright/ACodec.cpp
+++ b/media/libstagefright/ACodec.cpp
@@ -363,6 +363,7 @@ ACodec::ACodec()
mIsEncoder(false),
mUseMetadataOnEncoderOutput(false),
mShutdownInProgress(false),
+ mIsConfiguredForAdaptivePlayback(false),
mEncoderDelay(0),
mEncoderPadding(0),
mChannelMaskPresent(false),
@@ -370,7 +371,8 @@ ACodec::ACodec()
mDequeueCounter(0),
mStoreMetaDataInOutputBuffers(false),
mMetaDataBuffersToSubmit(0),
- mRepeatFrameDelayUs(-1ll) {
+ mRepeatFrameDelayUs(-1ll),
+ mMaxPtsGapUs(-1l) {
mUninitializedState = new UninitializedState(this);
mLoadedState = new LoadedState(this);
mLoadedToIdleState = new LoadedToIdleState(this);
@@ -452,6 +454,18 @@ void ACodec::signalRequestIDRFrame() {
(new AMessage(kWhatRequestIDRFrame, id()))->post();
}
+// *** NOTE: THE FOLLOWING WORKAROUND WILL BE REMOVED ***
+// Some codecs may return input buffers before having them processed.
+// This causes a halt if we already signaled an EOS on the input
+// port. For now keep submitting an output buffer if there was an
+// EOS on the input port, but not yet on the output port.
+void ACodec::signalSubmitOutputMetaDataBufferIfEOS_workaround() {
+ if (mPortEOS[kPortIndexInput] && !mPortEOS[kPortIndexOutput] &&
+ mMetaDataBuffersToSubmit > 0) {
+ (new AMessage(kWhatSubmitOutputMetaDataBufferIfEOS, id()))->post();
+ }
+}
+
status_t ACodec::allocateBuffersOnPort(OMX_U32 portIndex) {
CHECK(portIndex == kPortIndexInput || portIndex == kPortIndexOutput);
@@ -624,18 +638,34 @@ status_t ACodec::configureOutputBuffersFromNativeWindow(
return err;
}
- // XXX: Is this the right logic to use? It's not clear to me what the OMX
- // buffer counts refer to - how do they account for the renderer holding on
- // to buffers?
- if (def.nBufferCountActual < def.nBufferCountMin + *minUndequeuedBuffers) {
- OMX_U32 newBufferCount = def.nBufferCountMin + *minUndequeuedBuffers;
+ // FIXME: assume that surface is controlled by app (native window
+ // returns the number for the case when surface is not controlled by app)
+ // FIXME2: This means that minUndeqeueudBufs can be 1 larger than reported
+ // For now, try to allocate 1 more buffer, but don't fail if unsuccessful
+
+ // Use conservative allocation while also trying to reduce starvation
+ //
+ // 1. allocate at least nBufferCountMin + minUndequeuedBuffers - that is the
+ // minimum needed for the consumer to be able to work
+ // 2. try to allocate two (2) additional buffers to reduce starvation from
+ // the consumer
+ // plus an extra buffer to account for incorrect minUndequeuedBufs
+ for (OMX_U32 extraBuffers = 2 + 1; /* condition inside loop */; extraBuffers--) {
+ OMX_U32 newBufferCount =
+ def.nBufferCountMin + *minUndequeuedBuffers + extraBuffers;
def.nBufferCountActual = newBufferCount;
err = mOMX->setParameter(
mNode, OMX_IndexParamPortDefinition, &def, sizeof(def));
- if (err != OK) {
- ALOGE("[%s] setting nBufferCountActual to %lu failed: %d",
- mComponentName.c_str(), newBufferCount, err);
+ if (err == OK) {
+ *minUndequeuedBuffers += extraBuffers;
+ break;
+ }
+
+ ALOGW("[%s] setting nBufferCountActual to %lu failed: %d",
+ mComponentName.c_str(), newBufferCount, err);
+ /* exit condition */
+ if (extraBuffers == 0) {
return err;
}
}
@@ -660,6 +690,7 @@ status_t ACodec::allocateOutputBuffersFromNativeWindow() {
&bufferCount, &bufferSize, &minUndequeuedBuffers);
if (err != 0)
return err;
+ mNumUndequeuedBuffers = minUndequeuedBuffers;
ALOGV("[%s] Allocating %lu buffers from a native window of size %lu on "
"output port",
@@ -725,6 +756,7 @@ status_t ACodec::allocateOutputMetaDataBuffers() {
&bufferCount, &bufferSize, &minUndequeuedBuffers);
if (err != 0)
return err;
+ mNumUndequeuedBuffers = minUndequeuedBuffers;
ALOGV("[%s] Allocating %lu meta buffers on output port",
mComponentName.c_str(), bufferCount);
@@ -1096,6 +1128,10 @@ status_t ACodec::configureCodec(
&mRepeatFrameDelayUs)) {
mRepeatFrameDelayUs = -1ll;
}
+
+ if (!msg->findInt64("max-pts-gap-to-encoder", &mMaxPtsGapUs)) {
+ mMaxPtsGapUs = -1l;
+ }
}
// Always try to enable dynamic output buffers on native surface
@@ -1103,6 +1139,7 @@ status_t ACodec::configureCodec(
int32_t haveNativeWindow = msg->findObject("native-window", &obj) &&
obj != NULL;
mStoreMetaDataInOutputBuffers = false;
+ mIsConfiguredForAdaptivePlayback = false;
if (!encoder && video && haveNativeWindow) {
err = mOMX->storeMetaDataInBuffers(mNode, kPortIndexOutput, OMX_TRUE);
if (err != OK) {
@@ -1147,12 +1184,14 @@ status_t ACodec::configureCodec(
ALOGW_IF(err != OK,
"[%s] prepareForAdaptivePlayback failed w/ err %d",
mComponentName.c_str(), err);
+ mIsConfiguredForAdaptivePlayback = (err == OK);
}
// allow failure
err = OK;
} else {
ALOGV("[%s] storeMetaDataInBuffers succeeded", mComponentName.c_str());
mStoreMetaDataInOutputBuffers = true;
+ mIsConfiguredForAdaptivePlayback = true;
}
int32_t push;
@@ -2408,19 +2447,7 @@ void ACodec::waitUntilAllPossibleNativeWindowBuffersAreReturnedToUs() {
return;
}
- int minUndequeuedBufs = 0;
- status_t err = mNativeWindow->query(
- mNativeWindow.get(), NATIVE_WINDOW_MIN_UNDEQUEUED_BUFFERS,
- &minUndequeuedBufs);
-
- if (err != OK) {
- ALOGE("[%s] NATIVE_WINDOW_MIN_UNDEQUEUED_BUFFERS query failed: %s (%d)",
- mComponentName.c_str(), strerror(-err), -err);
-
- minUndequeuedBufs = 0;
- }
-
- while (countBuffersOwnedByNativeWindow() > (size_t)minUndequeuedBufs
+ while (countBuffersOwnedByNativeWindow() > mNumUndequeuedBuffers
&& dequeueBufferFromNativeWindow() != NULL) {
// these buffers will be submitted as regular buffers; account for this
if (mStoreMetaDataInOutputBuffers && mMetaDataBuffersToSubmit > 0) {
@@ -3235,11 +3262,11 @@ void ACodec::BaseState::onInputBufferFilled(const sp<AMessage> &msg) {
mCodec->mInputEOSResult = err;
}
break;
-
- default:
- CHECK_EQ((int)mode, (int)FREE_BUFFERS);
- break;
}
+
+ default:
+ CHECK_EQ((int)mode, (int)FREE_BUFFERS);
+ break;
}
}
@@ -3690,6 +3717,7 @@ void ACodec::LoadedState::stateEntered() {
mCodec->mDequeueCounter = 0;
mCodec->mMetaDataBuffersToSubmit = 0;
mCodec->mRepeatFrameDelayUs = -1ll;
+ mCodec->mIsConfiguredForAdaptivePlayback = false;
if (mCodec->mShutdownInProgress) {
bool keepComponentAllocated = mCodec->mKeepComponentAllocated;
@@ -3838,6 +3866,21 @@ void ACodec::LoadedState::onCreateInputSurface(
}
}
+ if (err == OK && mCodec->mMaxPtsGapUs > 0l) {
+ err = mCodec->mOMX->setInternalOption(
+ mCodec->mNode,
+ kPortIndexInput,
+ IOMX::INTERNAL_OPTION_MAX_TIMESTAMP_GAP,
+ &mCodec->mMaxPtsGapUs,
+ sizeof(mCodec->mMaxPtsGapUs));
+
+ if (err != OK) {
+ ALOGE("[%s] Unable to configure max timestamp gap (err %d)",
+ mCodec->mComponentName.c_str(),
+ err);
+ }
+ }
+
if (err == OK) {
notify->setObject("input-surface",
new BufferProducerWrapper(bufferProducer));
@@ -4036,6 +4079,9 @@ void ACodec::ExecutingState::submitOutputMetaBuffers() {
break;
}
}
+
+ // *** NOTE: THE FOLLOWING WORKAROUND WILL BE REMOVED ***
+ mCodec->signalSubmitOutputMetaDataBufferIfEOS_workaround();
}
void ACodec::ExecutingState::submitRegularOutputBuffers() {
@@ -4184,6 +4230,19 @@ bool ACodec::ExecutingState::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ // *** NOTE: THE FOLLOWING WORKAROUND WILL BE REMOVED ***
+ case kWhatSubmitOutputMetaDataBufferIfEOS:
+ {
+ if (mCodec->mPortEOS[kPortIndexInput] &&
+ !mCodec->mPortEOS[kPortIndexOutput]) {
+ status_t err = mCodec->submitOutputMetaDataBuffer();
+ if (err == OK) {
+ mCodec->signalSubmitOutputMetaDataBufferIfEOS_workaround();
+ }
+ }
+ return true;
+ }
+
default:
handled = BaseState::onMessageReceived(msg);
break;
diff --git a/media/libstagefright/MPEG4Writer.cpp b/media/libstagefright/MPEG4Writer.cpp
index 900b160..58a4487 100644
--- a/media/libstagefright/MPEG4Writer.cpp
+++ b/media/libstagefright/MPEG4Writer.cpp
@@ -44,7 +44,9 @@
namespace android {
static const int64_t kMinStreamableFileSizeInBytes = 5 * 1024 * 1024;
-static const int64_t kMax32BitFileSize = 0x007fffffffLL;
+static const int64_t kMax32BitFileSize = 0x00ffffffffLL; // 2^32-1 : max FAT32
+ // filesystem file size
+ // used by most SD cards
static const uint8_t kNalUnitTypeSeqParamSet = 0x07;
static const uint8_t kNalUnitTypePicParamSet = 0x08;
static const int64_t kInitialDelayTimeUs = 700000LL;
@@ -861,11 +863,11 @@ status_t MPEG4Writer::reset() {
// Fix up the size of the 'mdat' chunk.
if (mUse32BitOffset) {
lseek64(mFd, mMdatOffset, SEEK_SET);
- int32_t size = htonl(static_cast<int32_t>(mOffset - mMdatOffset));
+ uint32_t size = htonl(static_cast<uint32_t>(mOffset - mMdatOffset));
::write(mFd, &size, 4);
} else {
lseek64(mFd, mMdatOffset + 8, SEEK_SET);
- int64_t size = mOffset - mMdatOffset;
+ uint64_t size = mOffset - mMdatOffset;
size = hton64(size);
::write(mFd, &size, 8);
}
diff --git a/media/libstagefright/OMXCodec.cpp b/media/libstagefright/OMXCodec.cpp
index a711e43..450fb3b 100644
--- a/media/libstagefright/OMXCodec.cpp
+++ b/media/libstagefright/OMXCodec.cpp
@@ -94,6 +94,7 @@ static sp<MediaSource> InstantiateSoftwareEncoder(
#define CODEC_LOGI(x, ...) ALOGI("[%s] "x, mComponentName, ##__VA_ARGS__)
#define CODEC_LOGV(x, ...) ALOGV("[%s] "x, mComponentName, ##__VA_ARGS__)
+#define CODEC_LOGW(x, ...) ALOGW("[%s] "x, mComponentName, ##__VA_ARGS__)
#define CODEC_LOGE(x, ...) ALOGE("[%s] "x, mComponentName, ##__VA_ARGS__)
struct OMXCodecObserver : public BnOMXObserver {
@@ -1779,21 +1780,42 @@ status_t OMXCodec::allocateOutputBuffersFromNativeWindow() {
strerror(-err), -err);
return err;
}
-
- // XXX: Is this the right logic to use? It's not clear to me what the OMX
- // buffer counts refer to - how do they account for the renderer holding on
- // to buffers?
- if (def.nBufferCountActual < def.nBufferCountMin + minUndequeuedBufs) {
- OMX_U32 newBufferCount = def.nBufferCountMin + minUndequeuedBufs;
+ // FIXME: assume that surface is controlled by app (native window
+ // returns the number for the case when surface is not controlled by app)
+ // FIXME2: This means that minUndeqeueudBufs can be 1 larger than reported
+ // For now, try to allocate 1 more buffer, but don't fail if unsuccessful
+
+ // Use conservative allocation while also trying to reduce starvation
+ //
+ // 1. allocate at least nBufferCountMin + minUndequeuedBuffers - that is the
+ // minimum needed for the consumer to be able to work
+ // 2. try to allocate two (2) additional buffers to reduce starvation from
+ // the consumer
+ // plus an extra buffer to account for incorrect minUndequeuedBufs
+ CODEC_LOGI("OMX-buffers: min=%u actual=%u undeq=%d+1",
+ def.nBufferCountMin, def.nBufferCountActual, minUndequeuedBufs);
+
+ for (OMX_U32 extraBuffers = 2 + 1; /* condition inside loop */; extraBuffers--) {
+ OMX_U32 newBufferCount =
+ def.nBufferCountMin + minUndequeuedBufs + extraBuffers;
def.nBufferCountActual = newBufferCount;
err = mOMX->setParameter(
mNode, OMX_IndexParamPortDefinition, &def, sizeof(def));
- if (err != OK) {
- CODEC_LOGE("setting nBufferCountActual to %lu failed: %d",
- newBufferCount, err);
+
+ if (err == OK) {
+ minUndequeuedBufs += extraBuffers;
+ break;
+ }
+
+ CODEC_LOGW("setting nBufferCountActual to %lu failed: %d",
+ newBufferCount, err);
+ /* exit condition */
+ if (extraBuffers == 0) {
return err;
}
}
+ CODEC_LOGI("OMX-buffers: min=%u actual=%u undeq=%d+1",
+ def.nBufferCountMin, def.nBufferCountActual, minUndequeuedBufs);
err = native_window_set_buffer_count(
mNativeWindow.get(), def.nBufferCountActual);
diff --git a/media/libstagefright/chromium_http/Android.mk b/media/libstagefright/chromium_http/Android.mk
index f26f386..109e3fe 100644
--- a/media/libstagefright/chromium_http/Android.mk
+++ b/media/libstagefright/chromium_http/Android.mk
@@ -18,9 +18,11 @@ LOCAL_C_INCLUDES:= \
LOCAL_CFLAGS += -Wno-multichar
LOCAL_SHARED_LIBRARIES += \
+ libbinder \
libstlport \
libchromium_net \
libutils \
+ libbinder \
libcutils \
liblog \
libstagefright_foundation \
diff --git a/media/libstagefright/chromium_http/support.cpp b/media/libstagefright/chromium_http/support.cpp
index 3b33212..3de4877 100644
--- a/media/libstagefright/chromium_http/support.cpp
+++ b/media/libstagefright/chromium_http/support.cpp
@@ -34,14 +34,97 @@
#include "net/proxy/proxy_config_service_android.h"
#include "include/ChromiumHTTPDataSource.h"
-
+#include <arpa/inet.h>
+#include <binder/Parcel.h>
#include <cutils/log.h>
#include <media/stagefright/MediaErrors.h>
#include <media/stagefright/Utils.h>
#include <string>
+#include <utils/Errors.h>
+#include <binder/IInterface.h>
+#include <binder/IServiceManager.h>
+
namespace android {
+// must be kept in sync with interface defined in IAudioService.aidl
+class IAudioService : public IInterface
+{
+public:
+ DECLARE_META_INTERFACE(AudioService);
+
+ virtual int verifyX509CertChain(
+ const std::vector<std::string>& cert_chain,
+ const std::string& hostname,
+ const std::string& auth_type) = 0;
+};
+
+class BpAudioService : public BpInterface<IAudioService>
+{
+public:
+ BpAudioService(const sp<IBinder>& impl)
+ : BpInterface<IAudioService>(impl)
+ {
+ }
+
+ virtual int verifyX509CertChain(
+ const std::vector<std::string>& cert_chain,
+ const std::string& hostname,
+ const std::string& auth_type)
+ {
+ Parcel data, reply;
+ data.writeInterfaceToken(IAudioService::getInterfaceDescriptor());
+
+ // The vector of std::string we get isn't really a vector of strings,
+ // but rather a vector of binary certificate data. If we try to pass
+ // it to Java language code as a string, it ends up mangled on the other
+ // side, so send them as bytes instead.
+ // Since we can't send an array of byte arrays, send a single array,
+ // which will be split out by the recipient.
+
+ int numcerts = cert_chain.size();
+ data.writeInt32(numcerts);
+ size_t total = 0;
+ for (int i = 0; i < numcerts; i++) {
+ total += cert_chain[i].size();
+ }
+ size_t bytesize = total + numcerts * 4;
+ uint8_t *bytes = (uint8_t*) malloc(bytesize);
+ if (!bytes) {
+ return 5; // SSL_INVALID
+ }
+ ALOGV("%d certs: %d -> %d", numcerts, total, bytesize);
+
+ int offset = 0;
+ for (int i = 0; i < numcerts; i++) {
+ int32_t certsize = cert_chain[i].size();
+ // store this in a known order, which just happens to match the default
+ // byte order of a java ByteBuffer
+ int32_t bigsize = htonl(certsize);
+ ALOGV("cert %d, size %d", i, certsize);
+ memcpy(bytes + offset, &bigsize, sizeof(bigsize));
+ offset += sizeof(bigsize);
+ memcpy(bytes + offset, cert_chain[i].data(), certsize);
+ offset += certsize;
+ }
+ data.writeByteArray(bytesize, bytes);
+ free(bytes);
+ data.writeString16(String16(hostname.c_str()));
+ data.writeString16(String16(auth_type.c_str()));
+
+ int32_t result;
+ if (remote()->transact(IBinder::FIRST_CALL_TRANSACTION, data, &reply) != NO_ERROR
+ || reply.readExceptionCode() < 0 || reply.readInt32(&result) != NO_ERROR) {
+ return 5; // SSL_INVALID;
+ }
+ return result;
+ }
+
+};
+
+IMPLEMENT_META_INTERFACE(AudioService, "android.media.IAudioService");
+
+
static Mutex gNetworkThreadLock;
static base::Thread *gNetworkThread = NULL;
static scoped_refptr<SfRequestContext> gReqContext;
@@ -226,7 +309,24 @@ SfNetworkLibrary::VerifyResult SfNetworkLibrary::VerifyX509CertChain(
const std::vector<std::string>& cert_chain,
const std::string& hostname,
const std::string& auth_type) {
- return VERIFY_OK;
+
+ sp<IBinder> binder =
+ defaultServiceManager()->checkService(String16("audio"));
+ if (binder == 0) {
+ ALOGW("Thread cannot connect to the audio service");
+ } else {
+ sp<IAudioService> service = interface_cast<IAudioService>(binder);
+ int code = service->verifyX509CertChain(cert_chain, hostname, auth_type);
+ ALOGV("verified: %d", code);
+ if (code == -1) {
+ return VERIFY_OK;
+ } else if (code == 2) { // SSL_IDMISMATCH
+ return VERIFY_BAD_HOSTNAME;
+ } else if (code == 3) { // SSL_UNTRUSTED
+ return VERIFY_NO_TRUSTED_ROOT;
+ }
+ }
+ return VERIFY_INVOCATION_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index 033d981..6d48ab7 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -37,6 +37,8 @@
#include <media/stagefright/MetaData.h>
#include <media/stagefright/Utils.h>
+#include <utils/Mutex.h>
+
#include <ctype.h>
#include <inttypes.h>
#include <openssl/aes.h>
@@ -58,32 +60,57 @@ LiveSession::LiveSession(
: 0)),
mPrevBandwidthIndex(-1),
mStreamMask(0),
+ mNewStreamMask(0),
+ mSwapMask(0),
mCheckBandwidthGeneration(0),
+ mSwitchGeneration(0),
mLastDequeuedTimeUs(0ll),
mRealTimeBaseUs(0ll),
mReconfigurationInProgress(false),
- mDisconnectReplyID(0) {
+ mSwitchInProgress(false),
+ mDisconnectReplyID(0),
+ mSeekReplyID(0) {
if (mUIDValid) {
mHTTPDataSource->setUID(mUID);
}
- mPacketSources.add(
- STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */));
-
- mPacketSources.add(
- STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */));
+ mStreams[kAudioIndex] = StreamItem("audio");
+ mStreams[kVideoIndex] = StreamItem("video");
+ mStreams[kSubtitleIndex] = StreamItem("subtitles");
- mPacketSources.add(
- STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */));
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
+ mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
+ }
}
LiveSession::~LiveSession() {
}
+sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
+ ABuffer *discontinuity = new ABuffer(0);
+ discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
+ discontinuity->meta()->setInt32("swapPacketSource", swap);
+ discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
+ discontinuity->meta()->setInt64("timeUs", -1);
+ return discontinuity;
+}
+
+void LiveSession::swapPacketSource(StreamType stream) {
+ sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
+ sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
+ sp<AnotherPacketSource> tmp = aps;
+ aps = aps2;
+ aps2 = tmp;
+ aps2->clear();
+}
+
status_t LiveSession::dequeueAccessUnit(
StreamType stream, sp<ABuffer> *accessUnit) {
if (!(mStreamMask & stream)) {
- return UNKNOWN_ERROR;
+ // return -EWOULDBLOCK to avoid halting the decoder
+ // when switching between audio/video and audio only.
+ return -EWOULDBLOCK;
}
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
@@ -123,6 +150,25 @@ status_t LiveSession::dequeueAccessUnit(
streamStr,
type,
extra == NULL ? "NULL" : extra->debugString().c_str());
+
+ int32_t swap;
+ if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
+ && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
+ && swap) {
+
+ int32_t switchGeneration;
+ CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
+ {
+ Mutex::Autolock lock(mSwapMutex);
+ if (switchGeneration == mSwitchGeneration) {
+ swapPacketSource(stream);
+ sp<AMessage> msg = new AMessage(kWhatSwapped, id());
+ msg->setInt32("stream", stream);
+ msg->setInt32("switchGeneration", switchGeneration);
+ msg->post();
+ }
+ }
+ }
} else if (err == OK) {
if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
int64_t timeUs;
@@ -144,6 +190,7 @@ status_t LiveSession::dequeueAccessUnit(
}
status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
+ // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
if (!(mStreamMask & stream)) {
return UNKNOWN_ERROR;
}
@@ -189,6 +236,10 @@ status_t LiveSession::seekTo(int64_t timeUs) {
sp<AMessage> response;
status_t err = msg->postAndAwaitResponse(&response);
+ uint32_t replyID;
+ CHECK(response == mSeekReply && 0 != mSeekReplyID);
+ mSeekReply.clear();
+ mSeekReplyID = 0;
return err;
}
@@ -214,15 +265,12 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
case kWhatSeek:
{
- uint32_t replyID;
- CHECK(msg->senderAwaitsResponse(&replyID));
+ CHECK(msg->senderAwaitsResponse(&mSeekReplyID));
status_t err = onSeek(msg);
- sp<AMessage> response = new AMessage;
- response->setInt32("err", err);
-
- response->postReply(replyID);
+ mSeekReply = new AMessage;
+ mSeekReply->setInt32("err", err);
break;
}
@@ -240,13 +288,23 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
if (what == PlaylistFetcher::kWhatStopped) {
AString uri;
CHECK(msg->findString("uri", &uri));
- mFetcherInfos.removeItem(uri);
+ if (mFetcherInfos.removeItem(uri) < 0) {
+ // ignore duplicated kWhatStopped messages.
+ break;
+ }
+
+ tryToFinishBandwidthSwitch();
}
if (mContinuation != NULL) {
CHECK_GT(mContinuationCounter, 0);
if (--mContinuationCounter == 0) {
mContinuation->post();
+
+ if (mSeekReplyID != 0) {
+ CHECK(mSeekReply != NULL);
+ mSeekReply->postReply(mSeekReplyID);
+ }
}
}
break;
@@ -276,6 +334,8 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
postPrepared(err);
}
+ cancelBandwidthSwitch();
+
mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
@@ -314,6 +374,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ case PlaylistFetcher::kWhatStartedAt:
+ {
+ int32_t switchGeneration;
+ CHECK(msg->findInt32("switchGeneration", &switchGeneration));
+
+ if (switchGeneration != mSwitchGeneration) {
+ break;
+ }
+
+ // Resume fetcher for the original variant; the resumed fetcher should
+ // continue until the timestamps found in msg, which is stored by the
+ // new fetcher to indicate where the new variant has started buffering.
+ for (size_t i = 0; i < mFetcherInfos.size(); i++) {
+ const FetcherInfo info = mFetcherInfos.valueAt(i);
+ if (info.mToBeRemoved) {
+ info.mFetcher->resumeUntilAsync(msg);
+ }
+ }
+ break;
+ }
+
default:
TRESPASS();
}
@@ -358,6 +439,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
break;
}
+ case kWhatSwapped:
+ {
+ onSwapped(msg);
+ break;
+ }
default:
TRESPASS();
break;
@@ -375,6 +461,12 @@ int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b)
return 1;
}
+// static
+LiveSession::StreamType LiveSession::indexToType(int idx) {
+ CHECK(idx >= 0 && idx < kMaxStreams);
+ return (StreamType)(1 << idx);
+}
+
void LiveSession::onConnect(const sp<AMessage> &msg) {
AString url;
CHECK(msg->findString("url", &url));
@@ -462,6 +554,10 @@ void LiveSession::finishDisconnect() {
// during disconnection either.
cancelCheckBandwidthEvent();
+ // Protect mPacketSources from a swapPacketSource race condition through disconnect.
+ // (finishDisconnect, onFinishDisconnect2)
+ cancelBandwidthSwitch();
+
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
}
@@ -501,11 +597,13 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
notify->setString("uri", uri);
+ notify->setInt32("switchGeneration", mSwitchGeneration);
FetcherInfo info;
info.mFetcher = new PlaylistFetcher(notify, this, uri);
info.mDurationUs = -1ll;
info.mIsPrepared = false;
+ info.mToBeRemoved = false;
looper()->registerHandler(info.mFetcher);
mFetcherInfos.add(uri, info);
@@ -513,54 +611,82 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
return info.mFetcher;
}
-status_t LiveSession::fetchFile(
+/*
+ * Illustration of parameters:
+ *
+ * 0 `range_offset`
+ * +------------+-------------------------------------------------------+--+--+
+ * | | | next block to fetch | | |
+ * | | `source` handle => `out` buffer | | | |
+ * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | |
+ * | |<----------- `range_length` / buffer capacity ----------->| |
+ * |<------------------------------ file_size ------------------------------->|
+ *
+ * Special parameter values:
+ * - range_length == -1 means entire file
+ * - block_size == 0 means entire range
+ *
+ */
+ssize_t LiveSession::fetchFile(
const char *url, sp<ABuffer> *out,
int64_t range_offset, int64_t range_length,
+ uint32_t block_size, /* download block size */
+ sp<DataSource> *source, /* to return and reuse source */
String8 *actualUrl) {
- *out = NULL;
+ off64_t size;
+ sp<DataSource> temp_source;
+ if (source == NULL) {
+ source = &temp_source;
+ }
- sp<DataSource> source;
+ if (*source == NULL) {
+ if (!strncasecmp(url, "file://", 7)) {
+ *source = new FileSource(url + 7);
+ } else if (strncasecmp(url, "http://", 7)
+ && strncasecmp(url, "https://", 8)) {
+ return ERROR_UNSUPPORTED;
+ } else {
+ KeyedVector<String8, String8> headers = mExtraHeaders;
+ if (range_offset > 0 || range_length >= 0) {
+ headers.add(
+ String8("Range"),
+ String8(
+ StringPrintf(
+ "bytes=%lld-%s",
+ range_offset,
+ range_length < 0
+ ? "" : StringPrintf("%lld",
+ range_offset + range_length - 1).c_str()).c_str()));
+ }
+ status_t err = mHTTPDataSource->connect(url, &headers);
- if (!strncasecmp(url, "file://", 7)) {
- source = new FileSource(url + 7);
- } else if (strncasecmp(url, "http://", 7)
- && strncasecmp(url, "https://", 8)) {
- return ERROR_UNSUPPORTED;
- } else {
- KeyedVector<String8, String8> headers = mExtraHeaders;
- if (range_offset > 0 || range_length >= 0) {
- headers.add(
- String8("Range"),
- String8(
- StringPrintf(
- "bytes=%lld-%s",
- range_offset,
- range_length < 0
- ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
- }
- status_t err = mHTTPDataSource->connect(url, &headers);
+ if (err != OK) {
+ return err;
+ }
- if (err != OK) {
- return err;
+ *source = mHTTPDataSource;
}
-
- source = mHTTPDataSource;
}
- off64_t size;
- status_t err = source->getSize(&size);
-
- if (err != OK) {
+ status_t getSizeErr = (*source)->getSize(&size);
+ if (getSizeErr != OK) {
size = 65536;
}
- sp<ABuffer> buffer = new ABuffer(size);
- buffer->setRange(0, 0);
+ sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
+ if (*out == NULL) {
+ buffer->setRange(0, 0);
+ }
+ ssize_t bytesRead = 0;
+ // adjust range_length if only reading partial block
+ if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) {
+ range_length = buffer->size() + block_size;
+ }
for (;;) {
+ // Only resize when we don't know the size.
size_t bufferRemaining = buffer->capacity() - buffer->size();
-
- if (bufferRemaining == 0) {
+ if (bufferRemaining == 0 && getSizeErr != OK) {
bufferRemaining = 32768;
ALOGV("increasing download buffer to %zu bytes",
@@ -585,7 +711,9 @@ status_t LiveSession::fetchFile(
}
}
- ssize_t n = source->readAt(
+ // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
+ // to help us break out of the loop.
+ ssize_t n = (*source)->readAt(
buffer->size(), buffer->data() + buffer->size(),
maxBytesToRead);
@@ -598,17 +726,18 @@ status_t LiveSession::fetchFile(
}
buffer->setRange(0, buffer->size() + (size_t)n);
+ bytesRead += n;
}
*out = buffer;
if (actualUrl != NULL) {
- *actualUrl = source->getUri();
+ *actualUrl = (*source)->getUri();
if (actualUrl->isEmpty()) {
*actualUrl = url;
}
}
- return OK;
+ return bytesRead;
}
sp<M3UParser> LiveSession::fetchPlaylist(
@@ -619,9 +748,9 @@ sp<M3UParser> LiveSession::fetchPlaylist(
sp<ABuffer> buffer;
String8 actualUrl;
- status_t err = fetchFile(url, &buffer, 0, -1, &actualUrl);
+ ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
- if (err != OK) {
+ if (err <= 0) {
return NULL;
}
@@ -816,8 +945,25 @@ status_t LiveSession::selectTrack(size_t index, bool select) {
return err;
}
+bool LiveSession::canSwitchUp() {
+ // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
+ status_t err = OK;
+ for (size_t i = 0; i < mPacketSources.size(); ++i) {
+ sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
+ int64_t dur = source->getBufferedDurationUs(&err);
+ if (err == OK && dur > 10000000) {
+ return true;
+ }
+ }
+ return false;
+}
+
void LiveSession::changeConfiguration(
int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
+ // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
+ // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
+ cancelBandwidthSwitch();
+
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
@@ -833,21 +979,14 @@ void LiveSession::changeConfiguration(
CHECK_LT(bandwidthIndex, mBandwidthItems.size());
const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
- uint32_t streamMask = 0;
-
- AString audioURI;
- if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) {
- streamMask |= STREAMTYPE_AUDIO;
- }
-
- AString videoURI;
- if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) {
- streamMask |= STREAMTYPE_VIDEO;
- }
+ uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
+ uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
- AString subtitleURI;
- if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) {
- streamMask |= STREAMTYPE_SUBTITLES;
+ AString URIs[kMaxStreams];
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
+ streamMask |= indexToType(i);
+ }
}
// Step 1, stop and discard fetchers that are no longer needed.
@@ -859,10 +998,15 @@ void LiveSession::changeConfiguration(
// If we're seeking all current fetchers are discarded.
if (timeUs < 0ll) {
- if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI)
- || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI)
- || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) {
- discardFetcher = false;
+ // delay fetcher removal
+ discardFetcher = false;
+
+ for (size_t j = 0; j < kMaxStreams; ++j) {
+ StreamType type = indexToType(j);
+ if ((streamMask & type) && uri == URIs[j]) {
+ resumeMask |= type;
+ streamMask &= ~type;
+ }
}
}
@@ -873,17 +1017,20 @@ void LiveSession::changeConfiguration(
}
}
- sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
+ sp<AMessage> msg;
+ if (timeUs < 0ll) {
+ // skip onChangeConfiguration2 (decoder destruction) if switching.
+ msg = new AMessage(kWhatChangeConfiguration3, id());
+ } else {
+ msg = new AMessage(kWhatChangeConfiguration2, id());
+ }
msg->setInt32("streamMask", streamMask);
+ msg->setInt32("resumeMask", resumeMask);
msg->setInt64("timeUs", timeUs);
- if (streamMask & STREAMTYPE_AUDIO) {
- msg->setString("audioURI", audioURI.c_str());
- }
- if (streamMask & STREAMTYPE_VIDEO) {
- msg->setString("videoURI", videoURI.c_str());
- }
- if (streamMask & STREAMTYPE_SUBTITLES) {
- msg->setString("subtitleURI", subtitleURI.c_str());
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if (streamMask & indexToType(i)) {
+ msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
+ }
}
// Every time a fetcher acknowledges the stopAsync or pauseAsync request
@@ -895,6 +1042,11 @@ void LiveSession::changeConfiguration(
if (mContinuationCounter == 0) {
msg->post();
+
+ if (mSeekReplyID != 0) {
+ CHECK(mSeekReply != NULL);
+ mSeekReply->postReply(mSeekReplyID);
+ }
}
}
@@ -914,18 +1066,13 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
uint32_t streamMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
- AString audioURI, videoURI, subtitleURI;
- if (streamMask & STREAMTYPE_AUDIO) {
- CHECK(msg->findString("audioURI", &audioURI));
- ALOGV("audioURI = '%s'", audioURI.c_str());
- }
- if (streamMask & STREAMTYPE_VIDEO) {
- CHECK(msg->findString("videoURI", &videoURI));
- ALOGV("videoURI = '%s'", videoURI.c_str());
- }
- if (streamMask & STREAMTYPE_SUBTITLES) {
- CHECK(msg->findString("subtitleURI", &subtitleURI));
- ALOGV("subtitleURI = '%s'", subtitleURI.c_str());
+ AString URIs[kMaxStreams];
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if (streamMask & indexToType(i)) {
+ const AString &uriKey = mStreams[i].uriKey();
+ CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
+ ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
+ }
}
// Determine which decoders to shutdown on the player side,
@@ -935,15 +1082,12 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
// 2) its streamtype was already active and still is but the URI
// has changed.
uint32_t changedMask = 0;
- if (((mStreamMask & streamMask & STREAMTYPE_AUDIO)
- && !(audioURI == mAudioURI))
- || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) {
- changedMask |= STREAMTYPE_AUDIO;
- }
- if (((mStreamMask & streamMask & STREAMTYPE_VIDEO)
- && !(videoURI == mVideoURI))
- || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) {
- changedMask |= STREAMTYPE_VIDEO;
+ for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
+ if (((mStreamMask & streamMask & indexToType(i))
+ && !(URIs[i] == mStreams[i].mUri))
+ || (mStreamMask & ~streamMask & indexToType(i))) {
+ changedMask |= indexToType(i);
+ }
}
if (changedMask == 0) {
@@ -969,68 +1113,54 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
}
void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
+ mContinuation.clear();
// All remaining fetchers are still suspended, the player has shutdown
// any decoders that needed it.
- uint32_t streamMask;
+ uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
+ CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
- AString audioURI, videoURI, subtitleURI;
- if (streamMask & STREAMTYPE_AUDIO) {
- CHECK(msg->findString("audioURI", &audioURI));
- }
- if (streamMask & STREAMTYPE_VIDEO) {
- CHECK(msg->findString("videoURI", &videoURI));
- }
- if (streamMask & STREAMTYPE_SUBTITLES) {
- CHECK(msg->findString("subtitleURI", &subtitleURI));
+ for (size_t i = 0; i < kMaxStreams; ++i) {
+ if (streamMask & indexToType(i)) {
+ CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
+ }
}
int64_t timeUs;
+ bool switching = false;
CHECK(msg->findInt64("timeUs", &timeUs));
if (timeUs < 0ll) {
timeUs = mLastDequeuedTimeUs;
+ switching = true;
}
mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
- mStreamMask = streamMask;
- mAudioURI = audioURI;
- mVideoURI = videoURI;
- mSubtitleURI = subtitleURI;
+ mNewStreamMask = streamMask;
- // Resume all existing fetchers and assign them packet sources.
+ // Of all existing fetchers:
+ // * Resume fetchers that are still needed and assign them original packet sources.
+ // * Mark otherwise unneeded fetchers for removal.
+ ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
const AString &uri = mFetcherInfos.keyAt(i);
- uint32_t resumeMask = 0;
-
- sp<AnotherPacketSource> audioSource;
- if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
- audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
- resumeMask |= STREAMTYPE_AUDIO;
- }
-
- sp<AnotherPacketSource> videoSource;
- if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
- videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
- resumeMask |= STREAMTYPE_VIDEO;
+ sp<AnotherPacketSource> sources[kMaxStreams];
+ for (size_t j = 0; j < kMaxStreams; ++j) {
+ if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
+ sources[j] = mPacketSources.valueFor(indexToType(j));
+ }
}
- sp<AnotherPacketSource> subtitleSource;
- if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
- subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
- resumeMask |= STREAMTYPE_SUBTITLES;
+ FetcherInfo &info = mFetcherInfos.editValueAt(i);
+ if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
+ || sources[kSubtitleIndex] != NULL) {
+ info.mFetcher->startAsync(
+ sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
+ } else {
+ info.mToBeRemoved = true;
}
-
- CHECK_NE(resumeMask, 0u);
-
- ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
-
- streamMask &= ~resumeMask;
-
- mFetcherInfos.valueAt(i).mFetcher->startAsync(
- audioSource, videoSource, subtitleSource);
}
// streamMask now only contains the types that need a new fetcher created.
@@ -1039,52 +1169,65 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
ALOGV("creating new fetchers for mask 0x%08x", streamMask);
}
- while (streamMask != 0) {
- StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1));
+ // Find out when the original fetchers have buffered up to and start the new fetchers
+ // at a later timestamp.
+ for (size_t i = 0; i < kMaxStreams; i++) {
+ if (!(indexToType(i) & streamMask)) {
+ continue;
+ }
AString uri;
- switch (streamType) {
- case STREAMTYPE_AUDIO:
- uri = audioURI;
- break;
- case STREAMTYPE_VIDEO:
- uri = videoURI;
- break;
- case STREAMTYPE_SUBTITLES:
- uri = subtitleURI;
- break;
- default:
- TRESPASS();
- }
+ uri = mStreams[i].mUri;
sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
CHECK(fetcher != NULL);
- sp<AnotherPacketSource> audioSource;
- if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
- audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
- audioSource->clear();
-
- streamMask &= ~STREAMTYPE_AUDIO;
- }
-
- sp<AnotherPacketSource> videoSource;
- if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
- videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
- videoSource->clear();
-
- streamMask &= ~STREAMTYPE_VIDEO;
- }
+ int32_t latestSeq = -1;
+ int64_t latestTimeUs = 0ll;
+ sp<AnotherPacketSource> sources[kMaxStreams];
+
+ // TRICKY: looping from i as earlier streams are already removed from streamMask
+ for (size_t j = i; j < kMaxStreams; ++j) {
+ if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
+ sources[j] = mPacketSources.valueFor(indexToType(j));
+
+ if (!switching) {
+ sources[j]->clear();
+ } else {
+ int32_t type, seq;
+ int64_t srcTimeUs;
+ sp<AMessage> meta = sources[j]->getLatestMeta();
+
+ if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
+ CHECK(meta->findInt32("seq", &seq));
+ if (seq > latestSeq) {
+ latestSeq = seq;
+ }
+ CHECK(meta->findInt64("timeUs", &srcTimeUs));
+ if (srcTimeUs > latestTimeUs) {
+ latestTimeUs = srcTimeUs;
+ }
+ }
- sp<AnotherPacketSource> subtitleSource;
- if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
- subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
- subtitleSource->clear();
+ sources[j] = mPacketSources2.valueFor(indexToType(j));
+ sources[j]->clear();
+ uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
+ if (extraStreams & indexToType(j)) {
+ sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
+ }
+ }
- streamMask &= ~STREAMTYPE_SUBTITLES;
+ streamMask &= ~indexToType(j);
+ }
}
- fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs);
+ fetcher->startAsync(
+ sources[kAudioIndex],
+ sources[kVideoIndex],
+ sources[kSubtitleIndex],
+ timeUs,
+ latestTimeUs /* min start time(us) */,
+ latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
}
// All fetchers have now been started, the configuration change
@@ -1093,14 +1236,61 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
scheduleCheckBandwidthEvent();
ALOGV("XXX configuration change completed.");
-
mReconfigurationInProgress = false;
+ if (switching) {
+ mSwitchInProgress = true;
+ } else {
+ mStreamMask = mNewStreamMask;
+ }
if (mDisconnectReplyID != 0) {
finishDisconnect();
}
}
+void LiveSession::onSwapped(const sp<AMessage> &msg) {
+ int32_t switchGeneration;
+ CHECK(msg->findInt32("switchGeneration", &switchGeneration));
+ if (switchGeneration != mSwitchGeneration) {
+ return;
+ }
+
+ int32_t stream;
+ CHECK(msg->findInt32("stream", &stream));
+ mSwapMask |= stream;
+ if (mSwapMask != mStreamMask) {
+ return;
+ }
+
+ // Check if new variant contains extra streams.
+ uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
+ while (extraStreams) {
+ StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
+ swapPacketSource(extraStream);
+ extraStreams &= ~extraStream;
+ }
+
+ tryToFinishBandwidthSwitch();
+}
+
+// Mark switch done when:
+// 1. all old buffers are swapped out, AND
+// 2. all old fetchers are removed.
+void LiveSession::tryToFinishBandwidthSwitch() {
+ bool needToRemoveFetchers = false;
+ for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
+ if (mFetcherInfos.valueAt(i).mToBeRemoved) {
+ needToRemoveFetchers = true;
+ break;
+ }
+ }
+ if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
+ mStreamMask = mNewStreamMask;
+ mSwitchInProgress = false;
+ mSwapMask = 0;
+ }
+}
+
void LiveSession::scheduleCheckBandwidthEvent() {
sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
msg->setInt32("generation", mCheckBandwidthGeneration);
@@ -1111,16 +1301,37 @@ void LiveSession::cancelCheckBandwidthEvent() {
++mCheckBandwidthGeneration;
}
-void LiveSession::onCheckBandwidth() {
- if (mReconfigurationInProgress) {
- scheduleCheckBandwidthEvent();
- return;
+void LiveSession::cancelBandwidthSwitch() {
+ Mutex::Autolock lock(mSwapMutex);
+ mSwitchGeneration++;
+ mSwitchInProgress = false;
+ mSwapMask = 0;
+}
+
+bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
+ if (mReconfigurationInProgress || mSwitchInProgress) {
+ return false;
}
+ if (mPrevBandwidthIndex < 0) {
+ return true;
+ }
+
+ if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
+ return false;
+ } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
+ return canSwitchUp();
+ } else {
+ return true;
+ }
+}
+
+void LiveSession::onCheckBandwidth() {
size_t bandwidthIndex = getBandwidthIndex();
- if (mPrevBandwidthIndex < 0
- || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
+ if (canSwitchBandwidthTo(bandwidthIndex)) {
changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
+ } else {
+ scheduleCheckBandwidthEvent();
}
// Handling the kWhatCheckBandwidth even here does _not_ automatically
diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h
index 8f6a4ea..3f8fee5 100644
--- a/media/libstagefright/httplive/LiveSession.h
+++ b/media/libstagefright/httplive/LiveSession.h
@@ -42,10 +42,17 @@ struct LiveSession : public AHandler {
const sp<AMessage> &notify,
uint32_t flags = 0, bool uidValid = false, uid_t uid = 0);
+ enum StreamIndex {
+ kAudioIndex = 0,
+ kVideoIndex = 1,
+ kSubtitleIndex = 2,
+ kMaxStreams = 3,
+ };
+
enum StreamType {
- STREAMTYPE_AUDIO = 1,
- STREAMTYPE_VIDEO = 2,
- STREAMTYPE_SUBTITLES = 4,
+ STREAMTYPE_AUDIO = 1 << kAudioIndex,
+ STREAMTYPE_VIDEO = 1 << kVideoIndex,
+ STREAMTYPE_SUBTITLES = 1 << kSubtitleIndex,
};
status_t dequeueAccessUnit(StreamType stream, sp<ABuffer> *accessUnit);
@@ -74,6 +81,11 @@ struct LiveSession : public AHandler {
kWhatPreparationFailed,
};
+ // create a format-change discontinuity
+ //
+ // swap:
+ // whether is format-change discontinuity should trigger a buffer swap
+ sp<ABuffer> createFormatChangeBuffer(bool swap = true);
protected:
virtual ~LiveSession();
@@ -92,6 +104,7 @@ private:
kWhatChangeConfiguration2 = 'chC2',
kWhatChangeConfiguration3 = 'chC3',
kWhatFinishDisconnect2 = 'fin2',
+ kWhatSwapped = 'swap',
};
struct BandwidthItem {
@@ -103,8 +116,22 @@ private:
sp<PlaylistFetcher> mFetcher;
int64_t mDurationUs;
bool mIsPrepared;
+ bool mToBeRemoved;
};
+ struct StreamItem {
+ const char *mType;
+ AString mUri;
+ StreamItem() : mType("") {}
+ StreamItem(const char *type) : mType(type) {}
+ AString uriKey() {
+ AString key(mType);
+ key.append("URI");
+ return key;
+ }
+ };
+ StreamItem mStreams[kMaxStreams];
+
sp<AMessage> mNotify;
uint32_t mFlags;
bool mUIDValid;
@@ -123,21 +150,40 @@ private:
sp<M3UParser> mPlaylist;
KeyedVector<AString, FetcherInfo> mFetcherInfos;
- AString mAudioURI, mVideoURI, mSubtitleURI;
uint32_t mStreamMask;
+ // Masks used during reconfiguration:
+ // mNewStreamMask: streams in the variant playlist we're switching to;
+ // we don't want to immediately overwrite the original value.
+ uint32_t mNewStreamMask;
+
+ // mSwapMask: streams that have started to playback content in the new variant playlist;
+ // we use this to track reconfiguration progress.
+ uint32_t mSwapMask;
+
KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources;
+ // A second set of packet sources that buffer content for the variant we're switching to.
+ KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2;
+
+ // A mutex used to serialize two sets of events:
+ // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND
+ // * a forced bandwidth switch termination in cancelSwitch on the live looper.
+ Mutex mSwapMutex;
int32_t mCheckBandwidthGeneration;
+ int32_t mSwitchGeneration;
size_t mContinuationCounter;
sp<AMessage> mContinuation;
+ sp<AMessage> mSeekReply;
int64_t mLastDequeuedTimeUs;
int64_t mRealTimeBaseUs;
bool mReconfigurationInProgress;
+ bool mSwitchInProgress;
uint32_t mDisconnectReplyID;
+ uint32_t mSeekReplyID;
sp<PlaylistFetcher> addFetcher(const char *uri);
@@ -145,9 +191,25 @@ private:
status_t onSeek(const sp<AMessage> &msg);
void onFinishDisconnect2();
- status_t fetchFile(
+ // If given a non-zero block_size (default 0), it is used to cap the number of
+ // bytes read in from the DataSource. If given a non-NULL buffer, new content
+ // is read into the end.
+ //
+ // The DataSource we read from is responsible for signaling error or EOF to help us
+ // break out of the read loop. The DataSource can be returned to the caller, so
+ // that the caller can reuse it for subsequent fetches (within the initially
+ // requested range).
+ //
+ // For reused HTTP sources, the caller must download a file sequentially without
+ // any overlaps or gaps to prevent reconnection.
+ ssize_t fetchFile(
const char *url, sp<ABuffer> *out,
+ /* request/open a file starting at range_offset for range_length bytes */
int64_t range_offset = 0, int64_t range_length = -1,
+ /* download block size */
+ uint32_t block_size = 0,
+ /* reuse DataSource if doing partial fetch */
+ sp<DataSource> *source = NULL,
String8 *actualUrl = NULL);
sp<M3UParser> fetchPlaylist(
@@ -156,22 +218,34 @@ private:
size_t getBandwidthIndex();
static int SortByBandwidth(const BandwidthItem *, const BandwidthItem *);
+ static StreamType indexToType(int idx);
void changeConfiguration(
int64_t timeUs, size_t bandwidthIndex, bool pickTrack = false);
void onChangeConfiguration(const sp<AMessage> &msg);
void onChangeConfiguration2(const sp<AMessage> &msg);
void onChangeConfiguration3(const sp<AMessage> &msg);
+ void onSwapped(const sp<AMessage> &msg);
+ void tryToFinishBandwidthSwitch();
void scheduleCheckBandwidthEvent();
void cancelCheckBandwidthEvent();
+ // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources
+ // from being swapped out on stale discontinuities while manipulating
+ // mPacketSources/mPacketSources2.
+ void cancelBandwidthSwitch();
+
+ bool canSwitchBandwidthTo(size_t bandwidthIndex);
void onCheckBandwidth();
void finishDisconnect();
void postPrepared(status_t err);
+ void swapPacketSource(StreamType stream);
+ bool canSwitchUp();
+
DISALLOW_EVIL_CONSTRUCTORS(LiveSession);
};
diff --git a/media/libstagefright/httplive/M3UParser.cpp b/media/libstagefright/httplive/M3UParser.cpp
index b2a7010..20c3a76 100644
--- a/media/libstagefright/httplive/M3UParser.cpp
+++ b/media/libstagefright/httplive/M3UParser.cpp
@@ -24,6 +24,7 @@
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h>
+#include <media/stagefright/Utils.h>
#include <media/mediaplayer.h>
namespace android {
@@ -352,9 +353,28 @@ bool M3UParser::getTypeURI(size_t index, const char *key, AString *uri) const {
if (!meta->findString(key, &groupID)) {
*uri = mItems.itemAt(index).mURI;
- // Assume media without any more specific attribute contains
- // audio and video, but no subtitles.
- return !strcmp("audio", key) || !strcmp("video", key);
+ AString codecs;
+ if (!meta->findString("codecs", &codecs)) {
+ // Assume media without any more specific attribute contains
+ // audio and video, but no subtitles.
+ return !strcmp("audio", key) || !strcmp("video", key);
+ } else {
+ // Split the comma separated list of codecs.
+ size_t offset = 0;
+ ssize_t commaPos = -1;
+ codecs.append(',');
+ while ((commaPos = codecs.find(",", offset)) >= 0) {
+ AString codec(codecs, offset, commaPos - offset);
+ codec.trim();
+ // return true only if a codec of type `key` ("audio"/"video")
+ // is found.
+ if (codecIsType(codec, key)) {
+ return true;
+ }
+ offset = commaPos + 1;
+ }
+ return false;
+ }
}
sp<MediaGroup> group = mMediaGroups.valueFor(groupID);
@@ -369,18 +389,6 @@ bool M3UParser::getTypeURI(size_t index, const char *key, AString *uri) const {
return true;
}
-bool M3UParser::getAudioURI(size_t index, AString *uri) const {
- return getTypeURI(index, "audio", uri);
-}
-
-bool M3UParser::getVideoURI(size_t index, AString *uri) const {
- return getTypeURI(index, "video", uri);
-}
-
-bool M3UParser::getSubtitleURI(size_t index, AString *uri) const {
- return getTypeURI(index, "subtitles", uri);
-}
-
static bool MakeURL(const char *baseURL, const char *url, AString *out) {
out->clear();
@@ -694,12 +702,22 @@ status_t M3UParser::parseStreamInf(
*meta = new AMessage;
}
(*meta)->setInt32("bandwidth", x);
+ } else if (!strcasecmp("codecs", key.c_str())) {
+ if (!isQuotedString(val)) {
+ ALOGE("Expected quoted string for %s attribute, "
+ "got '%s' instead.",
+ key.c_str(), val.c_str());;
+
+ return ERROR_MALFORMED;
+ }
+
+ key.tolower();
+ const AString &codecs = unquoteString(val);
+ (*meta)->setString(key.c_str(), codecs.c_str());
} else if (!strcasecmp("audio", key.c_str())
|| !strcasecmp("video", key.c_str())
|| !strcasecmp("subtitles", key.c_str())) {
- if (val.size() < 2
- || val.c_str()[0] != '"'
- || val.c_str()[val.size() - 1] != '"') {
+ if (!isQuotedString(val)) {
ALOGE("Expected quoted string for %s attribute, "
"got '%s' instead.",
key.c_str(), val.c_str());
@@ -707,7 +725,7 @@ status_t M3UParser::parseStreamInf(
return ERROR_MALFORMED;
}
- AString groupID(val, 1, val.size() - 2);
+ const AString &groupID = unquoteString(val);
ssize_t groupIndex = mMediaGroups.indexOfKey(groupID);
if (groupIndex < 0) {
@@ -1095,4 +1113,121 @@ status_t M3UParser::ParseDouble(const char *s, double *x) {
return OK;
}
+// static
+bool M3UParser::isQuotedString(const AString &str) {
+ if (str.size() < 2
+ || str.c_str()[0] != '"'
+ || str.c_str()[str.size() - 1] != '"') {
+ return false;
+ }
+ return true;
+}
+
+// static
+AString M3UParser::unquoteString(const AString &str) {
+ if (!isQuotedString(str)) {
+ return str;
+ }
+ return AString(str, 1, str.size() - 2);
+}
+
+// static
+bool M3UParser::codecIsType(const AString &codec, const char *type) {
+ if (codec.size() < 4) {
+ return false;
+ }
+ const char *c = codec.c_str();
+ switch (FOURCC(c[0], c[1], c[2], c[3])) {
+ // List extracted from http://www.mp4ra.org/codecs.html
+ case 'ac-3':
+ case 'alac':
+ case 'dra1':
+ case 'dtsc':
+ case 'dtse':
+ case 'dtsh':
+ case 'dtsl':
+ case 'ec-3':
+ case 'enca':
+ case 'g719':
+ case 'g726':
+ case 'm4ae':
+ case 'mlpa':
+ case 'mp4a':
+ case 'raw ':
+ case 'samr':
+ case 'sawb':
+ case 'sawp':
+ case 'sevc':
+ case 'sqcp':
+ case 'ssmv':
+ case 'twos':
+ case 'agsm':
+ case 'alaw':
+ case 'dvi ':
+ case 'fl32':
+ case 'fl64':
+ case 'ima4':
+ case 'in24':
+ case 'in32':
+ case 'lpcm':
+ case 'Qclp':
+ case 'QDM2':
+ case 'QDMC':
+ case 'ulaw':
+ case 'vdva':
+ return !strcmp("audio", type);
+
+ case 'avc1':
+ case 'avc2':
+ case 'avcp':
+ case 'drac':
+ case 'encv':
+ case 'mjp2':
+ case 'mp4v':
+ case 'mvc1':
+ case 'mvc2':
+ case 'resv':
+ case 's263':
+ case 'svc1':
+ case 'vc-1':
+ case 'CFHD':
+ case 'civd':
+ case 'DV10':
+ case 'dvh5':
+ case 'dvh6':
+ case 'dvhp':
+ case 'DVOO':
+ case 'DVOR':
+ case 'DVTV':
+ case 'DVVT':
+ case 'flic':
+ case 'gif ':
+ case 'h261':
+ case 'h263':
+ case 'HD10':
+ case 'jpeg':
+ case 'M105':
+ case 'mjpa':
+ case 'mjpb':
+ case 'png ':
+ case 'PNTG':
+ case 'rle ':
+ case 'rpza':
+ case 'Shr0':
+ case 'Shr1':
+ case 'Shr2':
+ case 'Shr3':
+ case 'Shr4':
+ case 'SVQ1':
+ case 'SVQ3':
+ case 'tga ':
+ case 'tiff':
+ case 'WRLE':
+ return !strcmp("video", type);
+
+ default:
+ return false;
+ }
+}
+
} // namespace android
diff --git a/media/libstagefright/httplive/M3UParser.h b/media/libstagefright/httplive/M3UParser.h
index 5248004..ccd6556 100644
--- a/media/libstagefright/httplive/M3UParser.h
+++ b/media/libstagefright/httplive/M3UParser.h
@@ -45,9 +45,7 @@ struct M3UParser : public RefBase {
status_t getTrackInfo(Parcel* reply) const;
ssize_t getSelectedIndex() const;
- bool getAudioURI(size_t index, AString *uri) const;
- bool getVideoURI(size_t index, AString *uri) const;
- bool getSubtitleURI(size_t index, AString *uri) const;
+ bool getTypeURI(size_t index, const char *key, AString *uri) const;
protected:
virtual ~M3UParser();
@@ -95,11 +93,13 @@ private:
status_t parseMedia(const AString &line);
- bool getTypeURI(size_t index, const char *key, AString *uri) const;
-
static status_t ParseInt32(const char *s, int32_t *x);
static status_t ParseDouble(const char *s, double *x);
+ static bool isQuotedString(const AString &str);
+ static AString unquoteString(const AString &str);
+ static bool codecIsType(const AString &codec, const char *type);
+
DISALLOW_EVIL_CONSTRUCTORS(M3UParser);
};
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp
index 57bf7db..513f114 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.cpp
+++ b/media/libstagefright/httplive/PlaylistFetcher.cpp
@@ -48,26 +48,35 @@ namespace android {
// static
const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
+const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
+const int32_t PlaylistFetcher::kDownloadBlockSize = 192;
+const int32_t PlaylistFetcher::kNumSkipFrames = 10;
PlaylistFetcher::PlaylistFetcher(
const sp<AMessage> &notify,
const sp<LiveSession> &session,
const char *uri)
: mNotify(notify),
+ mStartTimeUsNotify(notify->dup()),
mSession(session),
mURI(uri),
mStreamTypeMask(0),
mStartTimeUs(-1ll),
+ mMinStartTimeUs(0ll),
+ mStopParams(NULL),
mLastPlaylistFetchTimeUs(-1ll),
mSeqNumber(-1),
mNumRetries(0),
mStartup(true),
+ mPrepared(false),
mNextPTSTimeUs(-1ll),
mMonitorQueueGeneration(0),
mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
mFirstPTSValid(false),
mAbsoluteTimeAnchorUs(0ll) {
memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
+ mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
+ mStartTimeUsNotify->setInt32("streamMask", 0);
}
PlaylistFetcher::~PlaylistFetcher() {
@@ -104,10 +113,16 @@ int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
return segmentStartUs;
}
-bool PlaylistFetcher::timeToRefreshPlaylist(int64_t nowUs) const {
- if (mPlaylist == NULL) {
+int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
+ int64_t nowUs = ALooper::GetNowUs();
+
+ if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
- return true;
+ return 0ll;
+ }
+
+ if (mPlaylist->isComplete()) {
+ return (~0llu >> 1);
}
int32_t targetDurationSecs;
@@ -158,11 +173,13 @@ bool PlaylistFetcher::timeToRefreshPlaylist(int64_t nowUs) const {
break;
}
- return mLastPlaylistFetchTimeUs + minPlaylistAgeUs <= nowUs;
+ int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
+ return delayUs > 0ll ? delayUs : 0ll;
}
status_t PlaylistFetcher::decryptBuffer(
- size_t playlistIndex, const sp<ABuffer> &buffer) {
+ size_t playlistIndex, const sp<ABuffer> &buffer,
+ bool first) {
sp<AMessage> itemMeta;
bool found = false;
AString method;
@@ -180,6 +197,7 @@ status_t PlaylistFetcher::decryptBuffer(
if (!found) {
method = "NONE";
}
+ buffer->meta()->setString("cipher-method", method.c_str());
if (method == "NONE") {
return OK;
@@ -200,9 +218,9 @@ status_t PlaylistFetcher::decryptBuffer(
if (index >= 0) {
key = mAESKeyForURI.valueAt(index);
} else {
- status_t err = mSession->fetchFile(keyURI.c_str(), &key);
+ ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
- if (err != OK) {
+ if (err < 0) {
ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
return ERROR_IO;
} else if (key->size() != 16) {
@@ -219,63 +237,89 @@ status_t PlaylistFetcher::decryptBuffer(
return UNKNOWN_ERROR;
}
- unsigned char aes_ivec[16];
+ size_t n = buffer->size();
+ if (!n) {
+ return OK;
+ }
+ CHECK(n % 16 == 0);
- AString iv;
- if (itemMeta->findString("cipher-iv", &iv)) {
- if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
- || iv.size() != 16 * 2 + 2) {
- ALOGE("malformed cipher IV '%s'.", iv.c_str());
- return ERROR_MALFORMED;
- }
+ if (first) {
+ // If decrypting the first block in a file, read the iv from the manifest
+ // or derive the iv from the file's sequence number.
- memset(aes_ivec, 0, sizeof(aes_ivec));
- for (size_t i = 0; i < 16; ++i) {
- char c1 = tolower(iv.c_str()[2 + 2 * i]);
- char c2 = tolower(iv.c_str()[3 + 2 * i]);
- if (!isxdigit(c1) || !isxdigit(c2)) {
+ AString iv;
+ if (itemMeta->findString("cipher-iv", &iv)) {
+ if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
+ || iv.size() != 16 * 2 + 2) {
ALOGE("malformed cipher IV '%s'.", iv.c_str());
return ERROR_MALFORMED;
}
- uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
- uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
- aes_ivec[i] = nibble1 << 4 | nibble2;
+ memset(mAESInitVec, 0, sizeof(mAESInitVec));
+ for (size_t i = 0; i < 16; ++i) {
+ char c1 = tolower(iv.c_str()[2 + 2 * i]);
+ char c2 = tolower(iv.c_str()[3 + 2 * i]);
+ if (!isxdigit(c1) || !isxdigit(c2)) {
+ ALOGE("malformed cipher IV '%s'.", iv.c_str());
+ return ERROR_MALFORMED;
+ }
+ uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
+ uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
+
+ mAESInitVec[i] = nibble1 << 4 | nibble2;
+ }
+ } else {
+ memset(mAESInitVec, 0, sizeof(mAESInitVec));
+ mAESInitVec[15] = mSeqNumber & 0xff;
+ mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
+ mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
+ mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
}
- } else {
- memset(aes_ivec, 0, sizeof(aes_ivec));
- aes_ivec[15] = mSeqNumber & 0xff;
- aes_ivec[14] = (mSeqNumber >> 8) & 0xff;
- aes_ivec[13] = (mSeqNumber >> 16) & 0xff;
- aes_ivec[12] = (mSeqNumber >> 24) & 0xff;
}
AES_cbc_encrypt(
buffer->data(), buffer->data(), buffer->size(),
- &aes_key, aes_ivec, AES_DECRYPT);
-
- // hexdump(buffer->data(), buffer->size());
+ &aes_key, mAESInitVec, AES_DECRYPT);
- size_t n = buffer->size();
- CHECK_GT(n, 0u);
+ return OK;
+}
- size_t pad = buffer->data()[n - 1];
+status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
+ status_t err;
+ AString method;
+ CHECK(buffer->meta()->findString("cipher-method", &method));
+ if (method == "NONE") {
+ return OK;
+ }
- CHECK_GT(pad, 0u);
- CHECK_LE(pad, 16u);
- CHECK_GE((size_t)n, pad);
- for (size_t i = 0; i < pad; ++i) {
- CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad);
+ uint8_t padding = 0;
+ if (buffer->size() > 0) {
+ padding = buffer->data()[buffer->size() - 1];
}
- n -= pad;
+ if (padding > 16) {
+ return ERROR_MALFORMED;
+ }
- buffer->setRange(buffer->offset(), n);
+ for (size_t i = buffer->size() - padding; i < padding; i++) {
+ if (buffer->data()[i] != padding) {
+ return ERROR_MALFORMED;
+ }
+ }
+ buffer->setRange(buffer->offset(), buffer->size() - padding);
return OK;
}
-void PlaylistFetcher::postMonitorQueue(int64_t delayUs) {
+void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
+ int64_t maxDelayUs = delayUsToRefreshPlaylist();
+ if (maxDelayUs < minDelayUs) {
+ maxDelayUs = minDelayUs;
+ }
+ if (delayUs > maxDelayUs) {
+ ALOGV("Need to refresh playlist in %lld", maxDelayUs);
+ delayUs = maxDelayUs;
+ }
sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
msg->setInt32("generation", mMonitorQueueGeneration);
msg->post(delayUs);
@@ -289,7 +333,9 @@ void PlaylistFetcher::startAsync(
const sp<AnotherPacketSource> &audioSource,
const sp<AnotherPacketSource> &videoSource,
const sp<AnotherPacketSource> &subtitleSource,
- int64_t startTimeUs) {
+ int64_t startTimeUs,
+ int64_t minStartTimeUs,
+ int32_t startSeqNumberHint) {
sp<AMessage> msg = new AMessage(kWhatStart, id());
uint32_t streamTypeMask = 0ul;
@@ -311,6 +357,8 @@ void PlaylistFetcher::startAsync(
msg->setInt32("streamTypeMask", streamTypeMask);
msg->setInt64("startTimeUs", startTimeUs);
+ msg->setInt64("minStartTimeUs", minStartTimeUs);
+ msg->setInt32("startSeqNumberHint", startSeqNumberHint);
msg->post();
}
@@ -318,8 +366,16 @@ void PlaylistFetcher::pauseAsync() {
(new AMessage(kWhatPause, id()))->post();
}
-void PlaylistFetcher::stopAsync() {
- (new AMessage(kWhatStop, id()))->post();
+void PlaylistFetcher::stopAsync(bool selfTriggered) {
+ sp<AMessage> msg = new AMessage(kWhatStop, id());
+ msg->setInt32("selfTriggered", selfTriggered);
+ msg->post();
+}
+
+void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
+ AMessage* msg = new AMessage(kWhatResumeUntil, id());
+ msg->setMessage("params", params);
+ msg->post();
}
void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
@@ -347,7 +403,7 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
case kWhatStop:
{
- onStop();
+ onStop(msg);
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatStopped);
@@ -356,6 +412,7 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
}
case kWhatMonitorQueue:
+ case kWhatDownloadNext:
{
int32_t generation;
CHECK(msg->findInt32("generation", &generation));
@@ -365,7 +422,17 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
break;
}
- onMonitorQueue();
+ if (msg->what() == kWhatMonitorQueue) {
+ onMonitorQueue();
+ } else {
+ onDownloadNext();
+ }
+ break;
+ }
+
+ case kWhatResumeUntil:
+ {
+ onResumeUntil(msg);
break;
}
@@ -381,7 +448,10 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
int64_t startTimeUs;
+ int32_t startSeqNumberHint;
CHECK(msg->findInt64("startTimeUs", &startTimeUs));
+ CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs));
+ CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint));
if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
void *ptr;
@@ -416,6 +486,11 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
if (mStartTimeUs >= 0ll) {
mSeqNumber = -1;
mStartup = true;
+ mPrepared = false;
+ }
+
+ if (startSeqNumberHint >= 0) {
+ mSeqNumber = startSeqNumberHint;
}
postMonitorQueue();
@@ -425,22 +500,83 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
void PlaylistFetcher::onPause() {
cancelMonitorQueue();
-
- mPacketSources.clear();
- mStreamTypeMask = 0;
}
-void PlaylistFetcher::onStop() {
+void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
cancelMonitorQueue();
- for (size_t i = 0; i < mPacketSources.size(); ++i) {
- mPacketSources.valueAt(i)->clear();
+ int32_t selfTriggered;
+ CHECK(msg->findInt32("selfTriggered", &selfTriggered));
+ if (!selfTriggered) {
+ // Self triggered stops only happen during switching, in which case we do not want
+ // to clear the discontinuities queued at the end of packet sources.
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+ packetSource->clear();
+ }
}
mPacketSources.clear();
mStreamTypeMask = 0;
}
+// Resume until we have reached the boundary timestamps listed in `msg`; when
+// the remaining time is too short (within a resume threshold) stop immediately
+// instead.
+status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
+ sp<AMessage> params;
+ CHECK(msg->findMessage("params", &params));
+
+ bool stop = false;
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+
+ const char *stopKey;
+ int streamType = mPacketSources.keyAt(i);
+ switch (streamType) {
+ case LiveSession::STREAMTYPE_VIDEO:
+ stopKey = "timeUsVideo";
+ break;
+
+ case LiveSession::STREAMTYPE_AUDIO:
+ stopKey = "timeUsAudio";
+ break;
+
+ case LiveSession::STREAMTYPE_SUBTITLES:
+ stopKey = "timeUsSubtitle";
+ break;
+
+ default:
+ TRESPASS();
+ }
+
+ // Don't resume if we would stop within a resume threshold.
+ int64_t latestTimeUs = 0, stopTimeUs = 0;
+ sp<AMessage> latestMeta = packetSource->getLatestMeta();
+ if (latestMeta != NULL
+ && (latestMeta->findInt64("timeUs", &latestTimeUs)
+ && params->findInt64(stopKey, &stopTimeUs))) {
+ int64_t diffUs = stopTimeUs - latestTimeUs;
+ if (diffUs < resumeThreshold(latestMeta)) {
+ stop = true;
+ }
+ }
+ }
+
+ if (stop) {
+ for (size_t i = 0; i < mPacketSources.size(); i++) {
+ mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
+ }
+ stopAsync(/* selfTriggered = */ true);
+ return OK;
+ }
+
+ mStopParams = params;
+ postMonitorQueue();
+
+ return OK;
+}
+
void PlaylistFetcher::notifyError(status_t err) {
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatError);
@@ -457,41 +593,70 @@ void PlaylistFetcher::queueDiscontinuity(
void PlaylistFetcher::onMonitorQueue() {
bool downloadMore = false;
+ refreshPlaylist();
+
+ int32_t targetDurationSecs;
+ int64_t targetDurationUs = kMinBufferedDurationUs;
+ if (mPlaylist != NULL) {
+ CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
+ targetDurationUs = targetDurationSecs * 1000000ll;
+ }
- status_t finalResult;
+ // buffer at least 3 times the target duration, or up to 10 seconds
+ int64_t durationToBufferUs = targetDurationUs * 3;
+ if (durationToBufferUs > kMinBufferedDurationUs) {
+ durationToBufferUs = kMinBufferedDurationUs;
+ }
+
+ int64_t bufferedDurationUs = 0ll;
+ status_t finalResult = NOT_ENOUGH_DATA;
if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
sp<AnotherPacketSource> packetSource =
mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
- int64_t bufferedDurationUs =
+ bufferedDurationUs =
packetSource->getBufferedDurationUs(&finalResult);
-
- downloadMore = (bufferedDurationUs < kMinBufferedDurationUs);
finalResult = OK;
} else {
- bool first = true;
- int64_t minBufferedDurationUs = 0ll;
-
+ // Use max stream duration to prevent us from waiting on a non-existent stream;
+ // when we cannot make out from the manifest what streams are included in a playlist
+ // we might assume extra streams.
for (size_t i = 0; i < mPacketSources.size(); ++i) {
if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
continue;
}
- int64_t bufferedDurationUs =
+ int64_t bufferedStreamDurationUs =
mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
-
- if (first || bufferedDurationUs < minBufferedDurationUs) {
- minBufferedDurationUs = bufferedDurationUs;
- first = false;
+ ALOGV("buffered %lld for stream %d",
+ bufferedStreamDurationUs, mPacketSources.keyAt(i));
+ if (bufferedStreamDurationUs > bufferedDurationUs) {
+ bufferedDurationUs = bufferedStreamDurationUs;
}
}
+ }
+ downloadMore = (bufferedDurationUs < durationToBufferUs);
+
+ // signal start if buffered up at least the target size
+ if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
+ mPrepared = true;
- downloadMore =
- !first && (minBufferedDurationUs < kMinBufferedDurationUs);
+ ALOGV("prepared, buffered=%lld > %lld",
+ bufferedDurationUs, targetDurationUs);
+ sp<AMessage> msg = mNotify->dup();
+ msg->setInt32("what", kWhatTemporarilyDoneFetching);
+ msg->post();
}
if (finalResult == OK && downloadMore) {
- onDownloadNext();
+ ALOGV("monitoring, buffered=%lld < %lld",
+ bufferedDurationUs, durationToBufferUs);
+ // delay the next download slightly; hopefully this gives other concurrent fetchers
+ // a better chance to run.
+ // onDownloadNext();
+ sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
+ msg->setInt32("generation", mMonitorQueueGeneration);
+ msg->post(1000l);
} else {
// Nothing to do yet, try again in a second.
@@ -499,15 +664,17 @@ void PlaylistFetcher::onMonitorQueue() {
msg->setInt32("what", kWhatTemporarilyDoneFetching);
msg->post();
- postMonitorQueue(1000000ll);
+ int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
+ ALOGV("pausing for %lld, buffered=%lld > %lld",
+ delayUs, bufferedDurationUs, durationToBufferUs);
+ // :TRICKY: need to enforce minimum delay because the delay to
+ // refresh the playlist will become 0
+ postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
}
}
-void PlaylistFetcher::onDownloadNext() {
- int64_t nowUs = ALooper::GetNowUs();
-
- if (mLastPlaylistFetchTimeUs < 0ll
- || (!mPlaylist->isComplete() && timeToRefreshPlaylist(nowUs))) {
+status_t PlaylistFetcher::refreshPlaylist() {
+ if (delayUsToRefreshPlaylist() <= 0) {
bool unchanged;
sp<M3UParser> playlist = mSession->fetchPlaylist(
mURI.c_str(), mPlaylistHash, &unchanged);
@@ -523,7 +690,7 @@ void PlaylistFetcher::onDownloadNext() {
} else {
ALOGE("failed to load playlist at url '%s'", mURI.c_str());
notifyError(ERROR_IO);
- return;
+ return ERROR_IO;
}
} else {
mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
@@ -536,6 +703,18 @@ void PlaylistFetcher::onDownloadNext() {
mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
}
+ return OK;
+}
+
+// static
+bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
+ return buffer->size() > 0 && buffer->data()[0] == 0x47;
+}
+
+void PlaylistFetcher::onDownloadNext() {
+ if (refreshPlaylist() != OK) {
+ return;
+ }
int32_t firstSeqNumberInPlaylist;
if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
@@ -549,17 +728,29 @@ void PlaylistFetcher::onDownloadNext() {
const int32_t lastSeqNumberInPlaylist =
firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
+ if (mStartup && mSeqNumber >= 0
+ && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) {
+ // in case we guessed wrong during reconfiguration, try fetching the latest content.
+ mSeqNumber = lastSeqNumberInPlaylist;
+ }
+
if (mSeqNumber < 0) {
CHECK_GE(mStartTimeUs, 0ll);
if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
mSeqNumber = getSeqNumberForTime(mStartTimeUs);
+ ALOGV("Initial sequence number for time %lld is %ld from (%ld .. %ld)",
+ mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
} else {
// If this is a live session, start 3 segments from the end.
mSeqNumber = lastSeqNumberInPlaylist - 3;
if (mSeqNumber < firstSeqNumberInPlaylist) {
mSeqNumber = firstSeqNumberInPlaylist;
}
+ ALOGV("Initial sequence number for live event %ld from (%ld .. %ld)",
+ mSeqNumber, firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
}
mStartTimeUs = -1ll;
@@ -571,16 +762,34 @@ void PlaylistFetcher::onDownloadNext() {
++mNumRetries;
if (mSeqNumber > lastSeqNumberInPlaylist) {
- mLastPlaylistFetchTimeUs = -1;
- postMonitorQueue(3000000ll);
+ // refresh in increasing fraction (1/2, 1/3, ...) of the
+ // playlist's target duration or 3 seconds, whichever is less
+ int32_t targetDurationSecs;
+ CHECK(mPlaylist->meta()->findInt32(
+ "target-duration", &targetDurationSecs));
+ int64_t delayUs = mPlaylist->size() * targetDurationSecs *
+ 1000000ll / (1 + mNumRetries);
+ if (delayUs > kMaxMonitorDelayUs) {
+ delayUs = kMaxMonitorDelayUs;
+ }
+ ALOGV("sequence number high: %ld from (%ld .. %ld), monitor in %lld (retry=%d)",
+ mSeqNumber, firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist, delayUs, mNumRetries);
+ postMonitorQueue(delayUs);
return;
}
// we've missed the boat, let's start from the lowest sequence
// number available and signal a discontinuity.
- ALOGI("We've missed the boat, restarting playback.");
- mSeqNumber = lastSeqNumberInPlaylist;
+ ALOGI("We've missed the boat, restarting playback."
+ " mStartup=%d, was looking for %d in %d-%d",
+ mStartup, mSeqNumber, firstSeqNumberInPlaylist,
+ lastSeqNumberInPlaylist);
+ mSeqNumber = lastSeqNumberInPlaylist - 3;
+ if (mSeqNumber < firstSeqNumberInPlaylist) {
+ mSeqNumber = firstSeqNumberInPlaylist;
+ }
explicitDiscontinuity = true;
// fall through
@@ -621,50 +830,160 @@ void PlaylistFetcher::onDownloadNext() {
ALOGV("fetching '%s'", uri.c_str());
- sp<ABuffer> buffer;
- status_t err = mSession->fetchFile(
- uri.c_str(), &buffer, range_offset, range_length);
-
- if (err != OK) {
- ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
- notifyError(err);
- return;
+ sp<DataSource> source;
+ sp<ABuffer> buffer, tsBuffer;
+ // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
+ // this avoids interleaved connections to the key and segment file.
+ {
+ sp<ABuffer> junk = new ABuffer(16);
+ junk->setRange(0, 16);
+ status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
+ true /* first */);
+ if (err != OK) {
+ notifyError(err);
+ return;
+ }
}
- CHECK(buffer != NULL);
+ // block-wise download
+ ssize_t bytesRead;
+ do {
+ bytesRead = mSession->fetchFile(
+ uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
+
+ if (bytesRead < 0) {
+ status_t err = bytesRead;
+ ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
+ notifyError(err);
+ return;
+ }
- err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
+ CHECK(buffer != NULL);
- if (err != OK) {
- ALOGE("decryptBuffer failed w/ error %d", err);
+ size_t size = buffer->size();
+ // Set decryption range.
+ buffer->setRange(size - bytesRead, bytesRead);
+ status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
+ buffer->offset() == 0 /* first */);
+ // Unset decryption range.
+ buffer->setRange(0, size);
- notifyError(err);
- return;
- }
+ if (err != OK) {
+ ALOGE("decryptBuffer failed w/ error %d", err);
- if (mStartup || seekDiscontinuity || explicitDiscontinuity) {
- // Signal discontinuity.
+ notifyError(err);
+ return;
+ }
- if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
- // If this was a live event this made no sense since
- // we don't have access to all the segment before the current
- // one.
- mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
+ if (mStartup || seekDiscontinuity || explicitDiscontinuity) {
+ // Signal discontinuity.
+
+ if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
+ // If this was a live event this made no sense since
+ // we don't have access to all the segment before the current
+ // one.
+ mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
+ }
+
+ if (seekDiscontinuity || explicitDiscontinuity) {
+ ALOGI("queueing discontinuity (seek=%d, explicit=%d)",
+ seekDiscontinuity, explicitDiscontinuity);
+
+ queueDiscontinuity(
+ explicitDiscontinuity
+ ? ATSParser::DISCONTINUITY_FORMATCHANGE
+ : ATSParser::DISCONTINUITY_SEEK,
+ NULL /* extra */);
+ }
+ }
+
+ err = OK;
+ if (bufferStartsWithTsSyncByte(buffer)) {
+ // Incremental extraction is only supported for MPEG2 transport streams.
+ if (tsBuffer == NULL) {
+ tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
+ tsBuffer->setRange(0, 0);
+ } else if (tsBuffer->capacity() != buffer->capacity()) {
+ size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
+ tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
+ tsBuffer->setRange(tsOff, tsSize);
+ }
+ tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
+
+ err = extractAndQueueAccessUnitsFromTs(tsBuffer);
+ }
+
+ if (err == -EAGAIN) {
+ // bad starting sequence number hint
+ postMonitorQueue();
+ return;
}
- if (seekDiscontinuity || explicitDiscontinuity) {
- ALOGI("queueing discontinuity (seek=%d, explicit=%d)",
- seekDiscontinuity, explicitDiscontinuity);
+ if (err == ERROR_OUT_OF_RANGE) {
+ // reached stopping point
+ stopAsync(/* selfTriggered = */ true);
+ return;
+ }
- queueDiscontinuity(
- explicitDiscontinuity
- ? ATSParser::DISCONTINUITY_FORMATCHANGE
- : ATSParser::DISCONTINUITY_SEEK,
- NULL /* extra */);
+ if (err != OK) {
+ notifyError(err);
+ return;
}
+
+ mStartup = false;
+ } while (bytesRead != 0);
+
+ if (bufferStartsWithTsSyncByte(buffer)) {
+ // If we still don't see a stream after fetching a full ts segment mark it as
+ // nonexistent.
+ const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
+ ATSParser::SourceType srcTypes[kNumTypes] =
+ { ATSParser::VIDEO, ATSParser::AUDIO };
+ LiveSession::StreamType streamTypes[kNumTypes] =
+ { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
+
+ for (size_t i = 0; i < kNumTypes; i++) {
+ ATSParser::SourceType srcType = srcTypes[i];
+ LiveSession::StreamType streamType = streamTypes[i];
+
+ sp<AnotherPacketSource> source =
+ static_cast<AnotherPacketSource *>(
+ mTSParser->getSource(srcType).get());
+
+ if (source == NULL) {
+ ALOGW("MPEG2 Transport stream does not contain %s data.",
+ srcType == ATSParser::VIDEO ? "video" : "audio");
+
+ mStreamTypeMask &= ~streamType;
+ mPacketSources.removeItem(streamType);
+ }
+ }
+
+ }
+
+ if (checkDecryptPadding(buffer) != OK) {
+ ALOGE("Incorrect padding bytes after decryption.");
+ notifyError(ERROR_MALFORMED);
+ return;
}
- err = extractAndQueueAccessUnits(buffer, itemMeta);
+ status_t err = OK;
+ if (tsBuffer != NULL) {
+ AString method;
+ CHECK(buffer->meta()->findString("cipher-method", &method));
+ if ((tsBuffer->size() > 0 && method == "NONE")
+ || tsBuffer->size() > 16) {
+ ALOGE("MPEG2 transport stream is not an even multiple of 188 "
+ "bytes in length.");
+ notifyError(ERROR_MALFORMED);
+ return;
+ }
+ }
+
+ // bulk extract non-ts files
+ if (tsBuffer == NULL) {
+ err = extractAndQueueAccessUnits(buffer, itemMeta);
+ }
if (err != OK) {
notifyError(err);
@@ -674,8 +993,6 @@ void PlaylistFetcher::onDownloadNext() {
++mSeqNumber;
postMonitorQueue();
-
- mStartup = false;
}
int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
@@ -710,95 +1027,163 @@ int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
return firstSeqNumberInPlaylist + index;
}
-status_t PlaylistFetcher::extractAndQueueAccessUnits(
- const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
- if (buffer->size() > 0 && buffer->data()[0] == 0x47) {
- // Let's assume this is an MPEG2 transport stream.
+status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
+ if (mTSParser == NULL) {
+ // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
+ mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
+ }
- if ((buffer->size() % 188) != 0) {
- ALOGE("MPEG2 transport stream is not an even multiple of 188 "
- "bytes in length.");
- return ERROR_MALFORMED;
- }
+ if (mNextPTSTimeUs >= 0ll) {
+ sp<AMessage> extra = new AMessage;
+ // Since we are using absolute timestamps, signal an offset of 0 to prevent
+ // ATSParser from skewing the timestamps of access units.
+ extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
- if (mTSParser == NULL) {
- mTSParser = new ATSParser;
- }
+ mTSParser->signalDiscontinuity(
+ ATSParser::DISCONTINUITY_SEEK, extra);
- if (mNextPTSTimeUs >= 0ll) {
- sp<AMessage> extra = new AMessage;
- extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs);
+ mNextPTSTimeUs = -1ll;
+ }
- mTSParser->signalDiscontinuity(
- ATSParser::DISCONTINUITY_SEEK, extra);
+ size_t offset = 0;
+ while (offset + 188 <= buffer->size()) {
+ status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
- mNextPTSTimeUs = -1ll;
+ if (err != OK) {
+ return err;
}
- size_t offset = 0;
- while (offset < buffer->size()) {
- status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
+ offset += 188;
+ }
+ // setRange to indicate consumed bytes.
+ buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
+
+ status_t err = OK;
+ for (size_t i = mPacketSources.size(); i-- > 0;) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+
+ const char *key;
+ ATSParser::SourceType type;
+ const LiveSession::StreamType stream = mPacketSources.keyAt(i);
+ switch (stream) {
+ case LiveSession::STREAMTYPE_VIDEO:
+ type = ATSParser::VIDEO;
+ key = "timeUsVideo";
+ break;
+
+ case LiveSession::STREAMTYPE_AUDIO:
+ type = ATSParser::AUDIO;
+ key = "timeUsAudio";
+ break;
- if (err != OK) {
- return err;
+ case LiveSession::STREAMTYPE_SUBTITLES:
+ {
+ ALOGE("MPEG2 Transport streams do not contain subtitles.");
+ return ERROR_MALFORMED;
+ break;
}
- offset += 188;
+ default:
+ TRESPASS();
}
- for (size_t i = mPacketSources.size(); i-- > 0;) {
- sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+ sp<AnotherPacketSource> source =
+ static_cast<AnotherPacketSource *>(
+ mTSParser->getSource(type).get());
- ATSParser::SourceType type;
- switch (mPacketSources.keyAt(i)) {
- case LiveSession::STREAMTYPE_VIDEO:
- type = ATSParser::VIDEO;
- break;
+ if (source == NULL) {
+ continue;
+ }
- case LiveSession::STREAMTYPE_AUDIO:
- type = ATSParser::AUDIO;
+ int64_t timeUs;
+ sp<ABuffer> accessUnit;
+ status_t finalResult;
+ while (source->hasBufferAvailable(&finalResult)
+ && source->dequeueAccessUnit(&accessUnit) == OK) {
+
+ CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
+ if (mMinStartTimeUs > 0) {
+ if (timeUs < mMinStartTimeUs) {
+ // TODO untested path
+ // try a later ts
+ int32_t targetDuration;
+ mPlaylist->meta()->findInt32("target-duration", &targetDuration);
+ int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration;
+ if (incr == 0) {
+ // increment mSeqNumber by at least one
+ incr = 1;
+ }
+ mSeqNumber += incr;
+ err = -EAGAIN;
break;
+ } else {
+ int64_t startTimeUs;
+ if (mStartTimeUsNotify != NULL
+ && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
+ mStartTimeUsNotify->setInt64(key, timeUs);
+
+ uint32_t streamMask = 0;
+ mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
+ streamMask |= mPacketSources.keyAt(i);
+ mStartTimeUsNotify->setInt32("streamMask", streamMask);
+
+ if (streamMask == mStreamTypeMask) {
+ mStartTimeUsNotify->post();
+ mStartTimeUsNotify.clear();
+ }
+ }
+ }
+ }
- case LiveSession::STREAMTYPE_SUBTITLES:
- {
- ALOGE("MPEG2 Transport streams do not contain subtitles.");
- return ERROR_MALFORMED;
+ if (mStopParams != NULL) {
+ // Queue discontinuity in original stream.
+ int64_t stopTimeUs;
+ if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) {
+ packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
+ mStreamTypeMask &= ~stream;
+ mPacketSources.removeItemsAt(i);
break;
}
-
- default:
- TRESPASS();
}
- sp<AnotherPacketSource> source =
- static_cast<AnotherPacketSource *>(
- mTSParser->getSource(type).get());
-
- if (source == NULL) {
- ALOGW("MPEG2 Transport stream does not contain %s data.",
- type == ATSParser::VIDEO ? "video" : "audio");
+ // Note that we do NOT dequeue any discontinuities except for format change.
- mStreamTypeMask &= ~mPacketSources.keyAt(i);
- mPacketSources.removeItemsAt(i);
- continue;
+ // for simplicity, store a reference to the format in each unit
+ sp<MetaData> format = source->getFormat();
+ if (format != NULL) {
+ accessUnit->meta()->setObject("format", format);
}
- sp<ABuffer> accessUnit;
- status_t finalResult;
- while (source->hasBufferAvailable(&finalResult)
- && source->dequeueAccessUnit(&accessUnit) == OK) {
- // Note that we do NOT dequeue any discontinuities.
+ // Stash the sequence number so we can hint future playlist where to start at.
+ accessUnit->meta()->setInt32("seq", mSeqNumber);
+ packetSource->queueAccessUnit(accessUnit);
+ }
- packetSource->queueAccessUnit(accessUnit);
- }
+ if (err != OK) {
+ break;
+ }
+ }
- if (packetSource->getFormat() == NULL) {
- packetSource->setFormat(source->getFormat());
- }
+ if (err != OK) {
+ for (size_t i = mPacketSources.size(); i-- > 0;) {
+ sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
+ packetSource->clear();
}
+ return err;
+ }
- return OK;
- } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
+ if (!mStreamTypeMask) {
+ // Signal gap is filled between original and new stream.
+ ALOGV("ERROR OUT OF RANGE");
+ return ERROR_OUT_OF_RANGE;
+ }
+
+ return OK;
+}
+
+status_t PlaylistFetcher::extractAndQueueAccessUnits(
+ const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
+ if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
ALOGE("This stream only contains subtitles.");
return ERROR_MALFORMED;
@@ -811,6 +1196,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
CHECK(itemMeta->findInt64("durationUs", &durationUs));
buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
buffer->meta()->setInt64("durationUs", durationUs);
+ buffer->meta()->setInt32("seq", mSeqNumber);
packetSource->queueAccessUnit(buffer);
return OK;
@@ -936,6 +1322,18 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
| (adtsHeader[4] << 3)
| (adtsHeader[5] >> 5);
+ if (aac_frame_length == 0) {
+ const uint8_t *id3Header = adtsHeader;
+ if (!memcmp(id3Header, "ID3", 3)) {
+ ID3 id3(id3Header, buffer->size() - offset, true);
+ if (id3.isValid()) {
+ offset += id3.rawSize();
+ continue;
+ };
+ }
+ return ERROR_MALFORMED;
+ }
+
CHECK_LE(offset + aac_frame_length, buffer->size());
sp<ABuffer> unit = new ABuffer(aac_frame_length);
@@ -947,6 +1345,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
// Each AAC frame encodes 1024 samples.
numSamples += 1024;
+ unit->meta()->setInt32("seq", mSeqNumber);
packetSource->queueAccessUnit(unit);
offset += aac_frame_length;
@@ -974,4 +1373,33 @@ void PlaylistFetcher::updateDuration() {
msg->post();
}
+int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
+ int64_t durationUs, threshold;
+ if (msg->findInt64("durationUs", &durationUs)) {
+ return kNumSkipFrames * durationUs;
+ }
+
+ sp<RefBase> obj;
+ msg->findObject("format", &obj);
+ MetaData *format = static_cast<MetaData *>(obj.get());
+
+ const char *mime;
+ CHECK(format->findCString(kKeyMIMEType, &mime));
+ bool audio = !strncasecmp(mime, "audio/", 6);
+ if (audio) {
+ // Assumes 1000 samples per frame.
+ int32_t sampleRate;
+ CHECK(format->findInt32(kKeySampleRate, &sampleRate));
+ return kNumSkipFrames /* frames */ * 1000 /* samples */
+ * (1000000 / sampleRate) /* sample duration (us) */;
+ } else {
+ int32_t frameRate;
+ if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
+ return kNumSkipFrames * (1000000 / frameRate);
+ }
+ }
+
+ return 500000ll;
+}
+
} // namespace android
diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h
index 1648e02..7e21523 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.h
+++ b/media/libstagefright/httplive/PlaylistFetcher.h
@@ -43,6 +43,7 @@ struct PlaylistFetcher : public AHandler {
kWhatTemporarilyDoneFetching,
kWhatPrepared,
kWhatPreparationFailed,
+ kWhatStartedAt,
};
PlaylistFetcher(
@@ -56,11 +57,15 @@ struct PlaylistFetcher : public AHandler {
const sp<AnotherPacketSource> &audioSource,
const sp<AnotherPacketSource> &videoSource,
const sp<AnotherPacketSource> &subtitleSource,
- int64_t startTimeUs = -1ll);
+ int64_t startTimeUs = -1ll,
+ int64_t minStartTimeUs = 0ll /* start after this timestamp */,
+ int32_t startSeqNumberHint = -1 /* try starting at this sequence number */);
void pauseAsync();
- void stopAsync();
+ void stopAsync(bool selfTriggered = false);
+
+ void resumeUntilAsync(const sp<AMessage> &params);
protected:
virtual ~PlaylistFetcher();
@@ -76,16 +81,28 @@ private:
kWhatPause = 'paus',
kWhatStop = 'stop',
kWhatMonitorQueue = 'moni',
+ kWhatResumeUntil = 'rsme',
+ kWhatDownloadNext = 'dlnx',
};
static const int64_t kMinBufferedDurationUs;
+ static const int64_t kMaxMonitorDelayUs;
+ static const int32_t kDownloadBlockSize;
+ static const int32_t kNumSkipFrames;
+
+ static bool bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer);
+ // notifications to mSession
sp<AMessage> mNotify;
+ sp<AMessage> mStartTimeUsNotify;
+
sp<LiveSession> mSession;
AString mURI;
uint32_t mStreamTypeMask;
int64_t mStartTimeUs;
+ int64_t mMinStartTimeUs; // start fetching no earlier than this value
+ sp<AMessage> mStopParams; // message containing the latest timestamps we should fetch.
KeyedVector<LiveSession::StreamType, sp<AnotherPacketSource> >
mPacketSources;
@@ -97,6 +114,7 @@ private:
int32_t mSeqNumber;
int32_t mNumRetries;
bool mStartup;
+ bool mPrepared;
int64_t mNextPTSTimeUs;
int32_t mMonitorQueueGeneration;
@@ -117,13 +135,29 @@ private:
uint64_t mFirstPTS;
int64_t mAbsoluteTimeAnchorUs;
+ // Stores the initialization vector to decrypt the next block of cipher text, which can
+ // either be derived from the sequence number, read from the manifest, or copied from
+ // the last block of cipher text (cipher-block chaining).
+ unsigned char mAESInitVec[16];
+
+ // Set first to true if decrypting the first segment of a playlist segment. When
+ // first is true, reset the initialization vector based on the available
+ // information in the manifest; otherwise, use the initialization vector as
+ // updated by the last call to AES_cbc_encrypt.
+ //
+ // For the input to decrypt correctly, decryptBuffer must be called on
+ // consecutive byte ranges on block boundaries, e.g. 0..15, 16..47, 48..63,
+ // and so on.
status_t decryptBuffer(
- size_t playlistIndex, const sp<ABuffer> &buffer);
+ size_t playlistIndex, const sp<ABuffer> &buffer,
+ bool first = true);
+ status_t checkDecryptPadding(const sp<ABuffer> &buffer);
- void postMonitorQueue(int64_t delayUs = 0);
+ void postMonitorQueue(int64_t delayUs = 0, int64_t minDelayUs = 0);
void cancelMonitorQueue();
- bool timeToRefreshPlaylist(int64_t nowUs) const;
+ int64_t delayUsToRefreshPlaylist() const;
+ status_t refreshPlaylist();
// Returns the media time in us of the segment specified by seqNumber.
// This is computed by summing the durations of all segments before it.
@@ -131,10 +165,15 @@ private:
status_t onStart(const sp<AMessage> &msg);
void onPause();
- void onStop();
+ void onStop(const sp<AMessage> &msg);
void onMonitorQueue();
void onDownloadNext();
+ // Resume a fetcher to continue until the stopping point stored in msg.
+ status_t onResumeUntil(const sp<AMessage> &msg);
+
+ status_t extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer);
+
status_t extractAndQueueAccessUnits(
const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta);
@@ -147,6 +186,10 @@ private:
void updateDuration();
+ // Before resuming a fetcher in onResume, check the remaining duration is longer than that
+ // returned by resumeThreshold.
+ int64_t resumeThreshold(const sp<AMessage> &msg);
+
DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher);
};
diff --git a/media/libstagefright/matroska/MatroskaExtractor.cpp b/media/libstagefright/matroska/MatroskaExtractor.cpp
index 3647583..0e4dd2b 100644
--- a/media/libstagefright/matroska/MatroskaExtractor.cpp
+++ b/media/libstagefright/matroska/MatroskaExtractor.cpp
@@ -718,41 +718,61 @@ bool MatroskaExtractor::isLiveStreaming() const {
return mIsLiveStreaming;
}
+static int bytesForSize(size_t size) {
+ // use at most 28 bits (4 times 7)
+ CHECK(size <= 0xfffffff);
+
+ if (size > 0x1fffff) {
+ return 4;
+ } else if (size > 0x3fff) {
+ return 3;
+ } else if (size > 0x7f) {
+ return 2;
+ }
+ return 1;
+}
+
+static void storeSize(uint8_t *data, size_t &idx, size_t size) {
+ int numBytes = bytesForSize(size);
+ idx += numBytes;
+
+ data += idx;
+ size_t next = 0;
+ while (numBytes--) {
+ *--data = (size & 0x7f) | next;
+ size >>= 7;
+ next = 0x80;
+ }
+}
+
static void addESDSFromCodecPrivate(
const sp<MetaData> &meta,
bool isAudio, const void *priv, size_t privSize) {
- static const uint8_t kStaticESDS[] = {
- 0x03, 22,
- 0x00, 0x00, // ES_ID
- 0x00, // streamDependenceFlag, URL_Flag, OCRstreamFlag
-
- 0x04, 17,
- 0x40, // ObjectTypeIndication
- 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00,
-
- 0x05,
- // CodecSpecificInfo (with size prefix) follows
- };
- // Make sure all sizes can be coded in a single byte.
- CHECK(privSize + 22 - 2 < 128);
- size_t esdsSize = sizeof(kStaticESDS) + privSize + 1;
+ int privSizeBytesRequired = bytesForSize(privSize);
+ int esdsSize2 = 14 + privSizeBytesRequired + privSize;
+ int esdsSize2BytesRequired = bytesForSize(esdsSize2);
+ int esdsSize1 = 4 + esdsSize2BytesRequired + esdsSize2;
+ int esdsSize1BytesRequired = bytesForSize(esdsSize1);
+ size_t esdsSize = 1 + esdsSize1BytesRequired + esdsSize1;
uint8_t *esds = new uint8_t[esdsSize];
- memcpy(esds, kStaticESDS, sizeof(kStaticESDS));
- uint8_t *ptr = esds + sizeof(kStaticESDS);
- *ptr++ = privSize;
- memcpy(ptr, priv, privSize);
-
- // Increment by codecPrivateSize less 2 bytes that are accounted for
- // already in lengths of 22/17
- esds[1] += privSize - 2;
- esds[6] += privSize - 2;
-
- // Set ObjectTypeIndication.
- esds[7] = isAudio ? 0x40 // Audio ISO/IEC 14496-3
- : 0x20; // Visual ISO/IEC 14496-2
+
+ size_t idx = 0;
+ esds[idx++] = 0x03;
+ storeSize(esds, idx, esdsSize1);
+ esds[idx++] = 0x00; // ES_ID
+ esds[idx++] = 0x00; // ES_ID
+ esds[idx++] = 0x00; // streamDependenceFlag, URL_Flag, OCRstreamFlag
+ esds[idx++] = 0x04;
+ storeSize(esds, idx, esdsSize2);
+ esds[idx++] = isAudio ? 0x40 // Audio ISO/IEC 14496-3
+ : 0x20; // Visual ISO/IEC 14496-2
+ for (int i = 0; i < 12; i++) {
+ esds[idx++] = 0x00;
+ }
+ esds[idx++] = 0x05;
+ storeSize(esds, idx, privSize);
+ memcpy(esds + idx, priv, privSize);
meta->setData(kKeyESDS, 0, esds, esdsSize);
diff --git a/media/libstagefright/mpeg2ts/ATSParser.h b/media/libstagefright/mpeg2ts/ATSParser.h
index a10edc9..8a80069 100644
--- a/media/libstagefright/mpeg2ts/ATSParser.h
+++ b/media/libstagefright/mpeg2ts/ATSParser.h
@@ -71,8 +71,9 @@ struct ATSParser : public RefBase {
void signalEOS(status_t finalResult);
enum SourceType {
- VIDEO,
- AUDIO
+ VIDEO = 0,
+ AUDIO = 1,
+ NUM_SOURCE_TYPES = 2
};
sp<MediaSource> getSource(SourceType type);
diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp
index 13f073a..6dfaa94 100644
--- a/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp
+++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.cpp
@@ -34,7 +34,8 @@ AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
: mIsAudio(false),
mFormat(NULL),
mLastQueuedTimeUs(0),
- mEOSResult(OK) {
+ mEOSResult(OK),
+ mLatestEnqueuedMeta(NULL) {
setFormat(meta);
}
@@ -70,7 +71,27 @@ status_t AnotherPacketSource::stop() {
}
sp<MetaData> AnotherPacketSource::getFormat() {
- return mFormat;
+ Mutex::Autolock autoLock(mLock);
+ if (mFormat != NULL) {
+ return mFormat;
+ }
+
+ List<sp<ABuffer> >::iterator it = mBuffers.begin();
+ while (it != mBuffers.end()) {
+ sp<ABuffer> buffer = *it;
+ int32_t discontinuity;
+ if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
+ break;
+ }
+
+ sp<RefBase> object;
+ if (buffer->meta()->findObject("format", &object)) {
+ return static_cast<MetaData*>(object.get());
+ }
+
+ ++it;
+ }
+ return NULL;
}
status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
@@ -94,6 +115,11 @@ status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
return INFO_DISCONTINUITY;
}
+ sp<RefBase> object;
+ if ((*buffer)->meta()->findObject("format", &object)) {
+ mFormat = static_cast<MetaData*>(object.get());
+ }
+
return OK;
}
@@ -120,17 +146,22 @@ status_t AnotherPacketSource::read(
}
return INFO_DISCONTINUITY;
- } else {
- int64_t timeUs;
- CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
+ }
+
+ sp<RefBase> object;
+ if (buffer->meta()->findObject("format", &object)) {
+ mFormat = static_cast<MetaData*>(object.get());
+ }
- MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
+ int64_t timeUs;
+ CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
- mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
+ MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
- *out = mediaBuffer;
- return OK;
- }
+ mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
+
+ *out = mediaBuffer;
+ return OK;
}
return mEOSResult;
@@ -152,12 +183,24 @@ void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
return;
}
- CHECK(buffer->meta()->findInt64("timeUs", &mLastQueuedTimeUs));
+ int64_t lastQueuedTimeUs;
+ CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
+ mLastQueuedTimeUs = lastQueuedTimeUs;
ALOGV("queueAccessUnit timeUs=%lld us (%.2f secs)", mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
Mutex::Autolock autoLock(mLock);
mBuffers.push_back(buffer);
mCondition.signal();
+
+ if (!mLatestEnqueuedMeta.get()) {
+ mLatestEnqueuedMeta = buffer->meta();
+ } else {
+ int64_t latestTimeUs = 0;
+ CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
+ if (lastQueuedTimeUs > latestTimeUs) {
+ mLatestEnqueuedMeta = buffer->meta();
+ }
+ }
}
void AnotherPacketSource::clear() {
@@ -167,6 +210,7 @@ void AnotherPacketSource::clear() {
mEOSResult = OK;
mFormat = NULL;
+ mLatestEnqueuedMeta = NULL;
}
void AnotherPacketSource::queueDiscontinuity(
@@ -191,6 +235,7 @@ void AnotherPacketSource::queueDiscontinuity(
mEOSResult = OK;
mLastQueuedTimeUs = 0;
+ mLatestEnqueuedMeta = NULL;
sp<ABuffer> buffer = new ABuffer(0);
buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
@@ -278,4 +323,9 @@ bool AnotherPacketSource::isFinished(int64_t duration) const {
return (mEOSResult != OK);
}
+sp<AMessage> AnotherPacketSource::getLatestMeta() {
+ Mutex::Autolock autoLock(mLock);
+ return mLatestEnqueuedMeta;
+}
+
} // namespace android
diff --git a/media/libstagefright/mpeg2ts/AnotherPacketSource.h b/media/libstagefright/mpeg2ts/AnotherPacketSource.h
index e16cf78..9b193a2 100644
--- a/media/libstagefright/mpeg2ts/AnotherPacketSource.h
+++ b/media/libstagefright/mpeg2ts/AnotherPacketSource.h
@@ -62,6 +62,8 @@ struct AnotherPacketSource : public MediaSource {
bool isFinished(int64_t duration) const;
+ sp<AMessage> getLatestMeta();
+
protected:
virtual ~AnotherPacketSource();
@@ -74,6 +76,7 @@ private:
int64_t mLastQueuedTimeUs;
List<sp<ABuffer> > mBuffers;
status_t mEOSResult;
+ sp<AMessage> mLatestEnqueuedMeta;
bool wasFormatChange(int32_t discontinuityType) const;
diff --git a/media/libstagefright/omx/GraphicBufferSource.cpp b/media/libstagefright/omx/GraphicBufferSource.cpp
index 4d3930b..3fe9c23 100644
--- a/media/libstagefright/omx/GraphicBufferSource.cpp
+++ b/media/libstagefright/omx/GraphicBufferSource.cpp
@@ -44,7 +44,11 @@ GraphicBufferSource::GraphicBufferSource(OMXNodeInstance* nodeInstance,
mEndOfStream(false),
mEndOfStreamSent(false),
mRepeatAfterUs(-1ll),
+ mMaxTimestampGapUs(-1ll),
+ mPrevOriginalTimeUs(-1ll),
+ mPrevModifiedTimeUs(-1ll),
mRepeatLastFrameGeneration(0),
+ mRepeatLastFrameTimestamp(-1ll),
mLatestSubmittedBufferId(-1),
mLatestSubmittedBufferFrameNum(0),
mLatestSubmittedBufferUseCount(0),
@@ -301,6 +305,32 @@ void GraphicBufferSource::codecBufferEmptied(OMX_BUFFERHEADERTYPE* header) {
return;
}
+void GraphicBufferSource::codecBufferFilled(OMX_BUFFERHEADERTYPE* header) {
+ Mutex::Autolock autoLock(mMutex);
+
+ if (mMaxTimestampGapUs > 0ll
+ && !(header->nFlags & OMX_BUFFERFLAG_CODECCONFIG)) {
+ ssize_t index = mOriginalTimeUs.indexOfKey(header->nTimeStamp);
+ if (index >= 0) {
+ ALOGV("OUT timestamp: %lld -> %lld",
+ header->nTimeStamp, mOriginalTimeUs[index]);
+ header->nTimeStamp = mOriginalTimeUs[index];
+ mOriginalTimeUs.removeItemsAt(index);
+ } else {
+ // giving up the effort as encoder doesn't appear to preserve pts
+ ALOGW("giving up limiting timestamp gap (pts = %lld)",
+ header->nTimeStamp);
+ mMaxTimestampGapUs = -1ll;
+ }
+ if (mOriginalTimeUs.size() > BufferQueue::NUM_BUFFER_SLOTS) {
+ // something terribly wrong must have happened, giving up...
+ ALOGE("mOriginalTimeUs has too many entries (%d)",
+ mOriginalTimeUs.size());
+ mMaxTimestampGapUs = -1ll;
+ }
+ }
+}
+
void GraphicBufferSource::suspend(bool suspend) {
Mutex::Autolock autoLock(mMutex);
@@ -433,6 +463,7 @@ bool GraphicBufferSource::repeatLatestSubmittedBuffer_l() {
BufferQueue::BufferItem item;
item.mBuf = mLatestSubmittedBufferId;
item.mFrameNumber = mLatestSubmittedBufferFrameNum;
+ item.mTimestamp = mRepeatLastFrameTimestamp;
status_t err = submitBuffer_l(item, cbi);
@@ -442,6 +473,20 @@ bool GraphicBufferSource::repeatLatestSubmittedBuffer_l() {
++mLatestSubmittedBufferUseCount;
+ /* repeat last frame up to kRepeatLastFrameCount times.
+ * in case of static scene, a single repeat might not get rid of encoder
+ * ghosting completely, refresh a couple more times to get better quality
+ */
+ if (--mRepeatLastFrameCount > 0) {
+ mRepeatLastFrameTimestamp = item.mTimestamp + mRepeatAfterUs * 1000;
+
+ if (mReflector != NULL) {
+ sp<AMessage> msg = new AMessage(kWhatRepeatLastFrame, mReflector->id());
+ msg->setInt32("generation", ++mRepeatLastFrameGeneration);
+ msg->post(mRepeatAfterUs);
+ }
+ }
+
return true;
}
@@ -462,8 +507,11 @@ void GraphicBufferSource::setLatestSubmittedBuffer_l(
mLatestSubmittedBufferId = item.mBuf;
mLatestSubmittedBufferFrameNum = item.mFrameNumber;
+ mRepeatLastFrameTimestamp = item.mTimestamp + mRepeatAfterUs * 1000;
+
mLatestSubmittedBufferUseCount = 1;
mRepeatBufferDeferred = false;
+ mRepeatLastFrameCount = kRepeatLastFrameCount;
if (mReflector != NULL) {
sp<AMessage> msg = new AMessage(kWhatRepeatLastFrame, mReflector->id());
@@ -499,9 +547,48 @@ status_t GraphicBufferSource::signalEndOfInputStream() {
return OK;
}
+int64_t GraphicBufferSource::getTimestamp(const BufferQueue::BufferItem &item) {
+ int64_t timeUs = item.mTimestamp / 1000;
+
+ if (mMaxTimestampGapUs > 0ll) {
+ /* Cap timestamp gap between adjacent frames to specified max
+ *
+ * In the scenario of cast mirroring, encoding could be suspended for
+ * prolonged periods. Limiting the pts gap to workaround the problem
+ * where encoder's rate control logic produces huge frames after a
+ * long period of suspension.
+ */
+
+ int64_t originalTimeUs = timeUs;
+ if (mPrevOriginalTimeUs >= 0ll) {
+ if (originalTimeUs < mPrevOriginalTimeUs) {
+ // Drop the frame if it's going backward in time. Bad timestamp
+ // could disrupt encoder's rate control completely.
+ ALOGW("Dropping frame that's going backward in time");
+ return -1;
+ }
+ int64_t timestampGapUs = originalTimeUs - mPrevOriginalTimeUs;
+ timeUs = (timestampGapUs < mMaxTimestampGapUs ?
+ timestampGapUs : mMaxTimestampGapUs) + mPrevModifiedTimeUs;
+ }
+ mPrevOriginalTimeUs = originalTimeUs;
+ mPrevModifiedTimeUs = timeUs;
+ mOriginalTimeUs.add(timeUs, originalTimeUs);
+ ALOGV("IN timestamp: %lld -> %lld", originalTimeUs, timeUs);
+ }
+
+ return timeUs;
+}
+
status_t GraphicBufferSource::submitBuffer_l(
const BufferQueue::BufferItem &item, int cbi) {
ALOGV("submitBuffer_l cbi=%d", cbi);
+
+ int64_t timeUs = getTimestamp(item);
+ if (timeUs < 0ll) {
+ return UNKNOWN_ERROR;
+ }
+
CodecBuffer& codecBuffer(mCodecBuffers.editItemAt(cbi));
codecBuffer.mGraphicBuffer = mBufferSlot[item.mBuf];
codecBuffer.mBuf = item.mBuf;
@@ -517,7 +604,7 @@ status_t GraphicBufferSource::submitBuffer_l(
status_t err = mNodeInstance->emptyDirectBuffer(header, 0,
4 + sizeof(buffer_handle_t), OMX_BUFFERFLAG_ENDOFFRAME,
- item.mTimestamp / 1000);
+ timeUs);
if (err != OK) {
ALOGW("WARNING: emptyDirectBuffer failed: 0x%x", err);
codecBuffer.mGraphicBuffer = NULL;
@@ -611,6 +698,12 @@ void GraphicBufferSource::onFrameAvailable() {
BufferQueue::BufferItem item;
status_t err = mBufferQueue->acquireBuffer(&item, 0);
if (err == OK) {
+ // If this is the first time we're seeing this buffer, add it to our
+ // slot table.
+ if (item.mGraphicBuffer != NULL) {
+ ALOGV("fillCodecBuffer_l: setting mBufferSlot %d", item.mBuf);
+ mBufferSlot[item.mBuf] = item.mGraphicBuffer;
+ }
mBufferQueue->releaseBuffer(item.mBuf, item.mFrameNumber,
EGL_NO_DISPLAY, EGL_NO_SYNC_KHR, item.mFence);
}
@@ -660,6 +753,17 @@ status_t GraphicBufferSource::setRepeatPreviousFrameDelayUs(
return OK;
}
+status_t GraphicBufferSource::setMaxTimestampGapUs(int64_t maxGapUs) {
+ Mutex::Autolock autoLock(mMutex);
+
+ if (mExecuting || maxGapUs <= 0ll) {
+ return INVALID_OPERATION;
+ }
+
+ mMaxTimestampGapUs = maxGapUs;
+
+ return OK;
+}
void GraphicBufferSource::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatRepeatLastFrame:
diff --git a/media/libstagefright/omx/GraphicBufferSource.h b/media/libstagefright/omx/GraphicBufferSource.h
index 9e5eee6..3b0e454 100644
--- a/media/libstagefright/omx/GraphicBufferSource.h
+++ b/media/libstagefright/omx/GraphicBufferSource.h
@@ -87,6 +87,10 @@ public:
// fill it with a new frame of data; otherwise, just mark it as available.
void codecBufferEmptied(OMX_BUFFERHEADERTYPE* header);
+ // Called when omx_message::FILL_BUFFER_DONE is received. (Currently the
+ // buffer source will fix timestamp in the header if needed.)
+ void codecBufferFilled(OMX_BUFFERHEADERTYPE* header);
+
// This is called after the last input frame has been submitted. We
// need to submit an empty buffer with the EOS flag set. If we don't
// have a codec buffer ready, we just set the mEndOfStream flag.
@@ -105,6 +109,15 @@ public:
// state and once this behaviour is specified it cannot be reset.
status_t setRepeatPreviousFrameDelayUs(int64_t repeatAfterUs);
+ // When set, the timestamp fed to the encoder will be modified such that
+ // the gap between two adjacent frames is capped at maxGapUs. Timestamp
+ // will be restored to the original when the encoded frame is returned to
+ // the client.
+ // This is to solve a problem in certain real-time streaming case, where
+ // encoder's rate control logic produces huge frames after a long period
+ // of suspension on input.
+ status_t setMaxTimestampGapUs(int64_t maxGapUs);
+
protected:
// BufferQueue::ConsumerListener interface, called when a new frame of
// data is available. If we're executing and a codec buffer is
@@ -165,6 +178,7 @@ private:
void setLatestSubmittedBuffer_l(const BufferQueue::BufferItem &item);
bool repeatLatestSubmittedBuffer_l();
+ int64_t getTimestamp(const BufferQueue::BufferItem &item);
// Lock, covers all member variables.
mutable Mutex mMutex;
@@ -206,13 +220,22 @@ private:
enum {
kWhatRepeatLastFrame,
};
-
+ enum {
+ kRepeatLastFrameCount = 10,
+ };
int64_t mRepeatAfterUs;
+ int64_t mMaxTimestampGapUs;
+
+ KeyedVector<int64_t, int64_t> mOriginalTimeUs;
+ int64_t mPrevOriginalTimeUs;
+ int64_t mPrevModifiedTimeUs;
sp<ALooper> mLooper;
sp<AHandlerReflector<GraphicBufferSource> > mReflector;
int32_t mRepeatLastFrameGeneration;
+ int64_t mRepeatLastFrameTimestamp;
+ int32_t mRepeatLastFrameCount;
int mLatestSubmittedBufferId;
uint64_t mLatestSubmittedBufferFrameNum;
diff --git a/media/libstagefright/omx/OMXNodeInstance.cpp b/media/libstagefright/omx/OMXNodeInstance.cpp
index 1422210..8391290 100644
--- a/media/libstagefright/omx/OMXNodeInstance.cpp
+++ b/media/libstagefright/omx/OMXNodeInstance.cpp
@@ -849,6 +849,7 @@ status_t OMXNodeInstance::setInternalOption(
switch (type) {
case IOMX::INTERNAL_OPTION_SUSPEND:
case IOMX::INTERNAL_OPTION_REPEAT_PREVIOUS_FRAME_DELAY:
+ case IOMX::INTERNAL_OPTION_MAX_TIMESTAMP_GAP:
{
const sp<GraphicBufferSource> &bufferSource =
getGraphicBufferSource();
@@ -864,7 +865,8 @@ status_t OMXNodeInstance::setInternalOption(
bool suspend = *(bool *)data;
bufferSource->suspend(suspend);
- } else {
+ } else if (type ==
+ IOMX::INTERNAL_OPTION_REPEAT_PREVIOUS_FRAME_DELAY){
if (size != sizeof(int64_t)) {
return INVALID_OPERATION;
}
@@ -872,6 +874,14 @@ status_t OMXNodeInstance::setInternalOption(
int64_t delayUs = *(int64_t *)data;
return bufferSource->setRepeatPreviousFrameDelayUs(delayUs);
+ } else {
+ if (size != sizeof(int64_t)) {
+ return INVALID_OPERATION;
+ }
+
+ int64_t maxGapUs = *(int64_t *)data;
+
+ return bufferSource->setMaxTimestampGapUs(maxGapUs);
}
return OK;
@@ -883,6 +893,8 @@ status_t OMXNodeInstance::setInternalOption(
}
void OMXNodeInstance::onMessage(const omx_message &msg) {
+ const sp<GraphicBufferSource>& bufferSource(getGraphicBufferSource());
+
if (msg.type == omx_message::FILL_BUFFER_DONE) {
OMX_BUFFERHEADERTYPE *buffer =
static_cast<OMX_BUFFERHEADERTYPE *>(
@@ -892,10 +904,18 @@ void OMXNodeInstance::onMessage(const omx_message &msg) {
static_cast<BufferMeta *>(buffer->pAppPrivate);
buffer_meta->CopyFromOMX(buffer);
- } else if (msg.type == omx_message::EMPTY_BUFFER_DONE) {
- const sp<GraphicBufferSource>& bufferSource(getGraphicBufferSource());
if (bufferSource != NULL) {
+ // fix up the buffer info (especially timestamp) if needed
+ bufferSource->codecBufferFilled(buffer);
+
+ omx_message newMsg = msg;
+ newMsg.u.extended_buffer_data.timestamp = buffer->nTimeStamp;
+ mObserver->onMessage(newMsg);
+ return;
+ }
+ } else if (msg.type == omx_message::EMPTY_BUFFER_DONE) {
+ if (bufferSource != NULL) {
// This is one of the buffers used exclusively by
// GraphicBufferSource.
// Don't dispatch a message back to ACodec, since it doesn't
diff --git a/media/libstagefright/wifi-display/source/TSPacketizer.cpp b/media/libstagefright/wifi-display/source/TSPacketizer.cpp
index edcc087..50d317a 100644
--- a/media/libstagefright/wifi-display/source/TSPacketizer.cpp
+++ b/media/libstagefright/wifi-display/source/TSPacketizer.cpp
@@ -216,7 +216,7 @@ sp<ABuffer> TSPacketizer::Track::prependADTSHeader(
uint8_t *ptr = dup->data();
*ptr++ = 0xff;
- *ptr++ = 0xf1; // b11110001, ID=0, layer=0, protection_absent=1
+ *ptr++ = 0xf9; // b11111001, ID=1(MPEG-2), layer=0, protection_absent=1
*ptr++ =
profile << 6