From 26a48f304a8754d655e554178ffb6d7ba4c5aac3 Mon Sep 17 00:00:00 2001 From: Lajos Molnar Date: Thu, 4 Jun 2015 10:30:02 -0700 Subject: stagefright: add support for batching OMX events Bug: 20503131 Change-Id: I762c419ed1245f8b83fb1f6bf61e5557213ca07b --- media/libmedia/IOMX.cpp | 58 +++++++--- media/libstagefright/ACodec.cpp | 149 ++++++++++++++++--------- media/libstagefright/OMXCodec.cpp | 7 +- media/libstagefright/include/OMXNodeInstance.h | 6 + media/libstagefright/omx/OMX.cpp | 27 +++-- media/libstagefright/omx/OMXNodeInstance.cpp | 23 ++-- media/libstagefright/omx/tests/OMXHarness.cpp | 6 +- media/libstagefright/omx/tests/OMXHarness.h | 2 +- 8 files changed, 186 insertions(+), 92 deletions(-) (limited to 'media') diff --git a/media/libmedia/IOMX.cpp b/media/libmedia/IOMX.cpp index ca1cdc7..16da65e 100644 --- a/media/libmedia/IOMX.cpp +++ b/media/libmedia/IOMX.cpp @@ -1077,16 +1077,29 @@ public: : BpInterface(impl) { } - virtual void onMessage(const omx_message &msg) { + virtual void onMessages(const std::list &messages) { Parcel data, reply; - data.writeInterfaceToken(IOMXObserver::getInterfaceDescriptor()); - data.write(&msg, sizeof(msg)); - if (msg.fenceFd >= 0) { - data.writeFileDescriptor(msg.fenceFd, true /* takeOwnership */); + std::list::const_iterator it = messages.cbegin(); + bool first = true; + while (it != messages.cend()) { + const omx_message &msg = *it++; + if (first) { + data.writeInterfaceToken(IOMXObserver::getInterfaceDescriptor()); + data.writeInt32(msg.node); + first = false; + } + data.writeInt32(msg.fenceFd >= 0); + if (msg.fenceFd >= 0) { + data.writeFileDescriptor(msg.fenceFd, true /* takeOwnership */); + } + data.writeInt32(msg.type); + data.write(&msg.u, sizeof(msg.u)); + ALOGV("onMessage writing message %d, size %zu", msg.type, sizeof(msg)); + } + if (!first) { + data.writeInt32(-1); // mark end + remote()->transact(OBSERVER_ON_MSG, data, &reply, IBinder::FLAG_ONEWAY); } - ALOGV("onMessage writing message %d, size %zu", msg.type, sizeof(msg)); - - remote()->transact(OBSERVER_ON_MSG, data, &reply, IBinder::FLAG_ONEWAY); } }; @@ -1098,19 +1111,28 @@ status_t BnOMXObserver::onTransact( case OBSERVER_ON_MSG: { CHECK_OMX_INTERFACE(IOMXObserver, data, reply); + IOMX::node_id node = data.readInt32(); + std::list messages; + status_t err = FAILED_TRANSACTION; // must receive at least one message + do { + int haveFence = data.readInt32(); + if (haveFence < 0) { // we use -1 to mark end of messages + break; + } + omx_message msg; + msg.node = node; + msg.fenceFd = haveFence ? ::dup(data.readFileDescriptor()) : -1; + msg.type = (typeof(msg.type))data.readInt32(); + err = data.read(&msg.u, sizeof(msg.u)); + ALOGV("onTransact reading message %d, size %zu", msg.type, sizeof(msg)); + messages.push_back(msg); + } while (err == OK); - omx_message msg; - data.read(&msg, sizeof(msg)); - if (msg.fenceFd >= 0) { - msg.fenceFd = ::dup(data.readFileDescriptor()); + if (err == OK) { + onMessages(messages); } - ALOGV("onTransact reading message %d, size %zu", msg.type, sizeof(msg)); - - // XXX Could use readInplace maybe? - onMessage(msg); - - return NO_ERROR; + return err; } default: diff --git a/media/libstagefright/ACodec.cpp b/media/libstagefright/ACodec.cpp index 3a98e8c..172e19c 100644 --- a/media/libstagefright/ACodec.cpp +++ b/media/libstagefright/ACodec.cpp @@ -106,6 +106,16 @@ static void InitOMXParams(T *params) { params->nVersion.s.nStep = 0; } +struct MessageList : public RefBase { + MessageList() { + } + std::list > &getList() { return mList; } +private: + std::list > mList; + + DISALLOW_EVIL_CONSTRUCTORS(MessageList); +}; + struct CodecObserver : public BnOMXObserver { CodecObserver() {} @@ -114,55 +124,65 @@ struct CodecObserver : public BnOMXObserver { } // from IOMXObserver - virtual void onMessage(const omx_message &omx_msg) { - sp msg = mNotify->dup(); - - msg->setInt32("type", omx_msg.type); - msg->setInt32("node", omx_msg.node); - - switch (omx_msg.type) { - case omx_message::EVENT: - { - msg->setInt32("event", omx_msg.u.event_data.event); - msg->setInt32("data1", omx_msg.u.event_data.data1); - msg->setInt32("data2", omx_msg.u.event_data.data2); - break; + virtual void onMessages(const std::list &messages) { + sp notify; + bool first = true; + sp msgList = new MessageList(); + for (std::list::const_iterator it = messages.cbegin(); + it != messages.cend(); ++it) { + const omx_message &omx_msg = *it; + if (first) { + notify = mNotify->dup(); + notify->setInt32("node", omx_msg.node); } - case omx_message::EMPTY_BUFFER_DONE: - { - msg->setInt32("buffer", omx_msg.u.buffer_data.buffer); - msg->setInt32("fence_fd", omx_msg.fenceFd); - break; - } + sp msg = new AMessage; + msg->setInt32("type", omx_msg.type); + switch (omx_msg.type) { + case omx_message::EVENT: + { + msg->setInt32("event", omx_msg.u.event_data.event); + msg->setInt32("data1", omx_msg.u.event_data.data1); + msg->setInt32("data2", omx_msg.u.event_data.data2); + break; + } - case omx_message::FILL_BUFFER_DONE: - { - msg->setInt32( - "buffer", omx_msg.u.extended_buffer_data.buffer); - msg->setInt32( - "range_offset", - omx_msg.u.extended_buffer_data.range_offset); - msg->setInt32( - "range_length", - omx_msg.u.extended_buffer_data.range_length); - msg->setInt32( - "flags", - omx_msg.u.extended_buffer_data.flags); - msg->setInt64( - "timestamp", - omx_msg.u.extended_buffer_data.timestamp); - msg->setInt32( - "fence_fd", omx_msg.fenceFd); - break; - } + case omx_message::EMPTY_BUFFER_DONE: + { + msg->setInt32("buffer", omx_msg.u.buffer_data.buffer); + msg->setInt32("fence_fd", omx_msg.fenceFd); + break; + } - default: - ALOGE("Unrecognized message type: %d", omx_msg.type); - break; - } + case omx_message::FILL_BUFFER_DONE: + { + msg->setInt32( + "buffer", omx_msg.u.extended_buffer_data.buffer); + msg->setInt32( + "range_offset", + omx_msg.u.extended_buffer_data.range_offset); + msg->setInt32( + "range_length", + omx_msg.u.extended_buffer_data.range_length); + msg->setInt32( + "flags", + omx_msg.u.extended_buffer_data.flags); + msg->setInt64( + "timestamp", + omx_msg.u.extended_buffer_data.timestamp); + msg->setInt32( + "fence_fd", omx_msg.fenceFd); + break; + } - msg->post(); + default: + ALOGE("Unrecognized message type: %d", omx_msg.type); + break; + } + msgList->getList().push_back(msg); + } + notify->setObject("messages", msgList); + notify->post(); } protected: @@ -200,8 +220,15 @@ protected: void postFillThisBuffer(BufferInfo *info); private: + // Handles an OMX message. Returns true iff message was handled. bool onOMXMessage(const sp &msg); + // Handles a list of messages. Returns true iff messages were handled. + bool onOMXMessageList(const sp &msg); + + // returns true iff this message is for this component and the component is alive + bool checkOMXMessage(const sp &msg); + bool onOMXEmptyBufferDone(IOMX::buffer_id bufferID, int fenceFd); bool onOMXFillBufferDone( @@ -4402,9 +4429,14 @@ bool ACodec::BaseState::onMessageReceived(const sp &msg) { break; } + case ACodec::kWhatOMXMessageList: + { + return checkOMXMessage(msg) ? onOMXMessageList(msg) : true; + } + case ACodec::kWhatOMXMessage: { - return onOMXMessage(msg); + return checkOMXMessage(msg) ? onOMXMessage(msg) : true; } case ACodec::kWhatSetSurface: @@ -4463,16 +4495,13 @@ bool ACodec::BaseState::onMessageReceived(const sp &msg) { return true; } -bool ACodec::BaseState::onOMXMessage(const sp &msg) { - int32_t type; - CHECK(msg->findInt32("type", &type)); - +bool ACodec::BaseState::checkOMXMessage(const sp &msg) { // there is a possibility that this is an outstanding message for a // codec that we have already destroyed if (mCodec->mNode == 0) { ALOGI("ignoring message as already freed component: %s", msg->debugString().c_str()); - return true; + return false; } IOMX::node_id nodeID; @@ -4481,6 +4510,24 @@ bool ACodec::BaseState::onOMXMessage(const sp &msg) { ALOGE("Unexpected message for nodeID: %u, should have been %u", nodeID, mCodec->mNode); return false; } + return true; +} + +bool ACodec::BaseState::onOMXMessageList(const sp &msg) { + sp obj; + CHECK(msg->findObject("messages", &obj)); + sp msgList = static_cast(obj.get()); + + for (std::list>::const_iterator it = msgList->getList().cbegin(); + it != msgList->getList().cend(); ++it) { + onOMXMessage(*it); + } + return true; +} + +bool ACodec::BaseState::onOMXMessage(const sp &msg) { + int32_t type; + CHECK(msg->findInt32("type", &type)); switch (type) { case omx_message::EVENT: @@ -5316,7 +5363,7 @@ bool ACodec::UninitializedState::onAllocateComponent(const sp &msg) { return false; } - notify = new AMessage(kWhatOMXMessage, mCodec); + notify = new AMessage(kWhatOMXMessageList, mCodec); observer->setNotificationMessage(notify); mCodec->mComponentName = componentName; diff --git a/media/libstagefright/OMXCodec.cpp b/media/libstagefright/OMXCodec.cpp index 927cc6c..96aa808 100644 --- a/media/libstagefright/OMXCodec.cpp +++ b/media/libstagefright/OMXCodec.cpp @@ -116,12 +116,15 @@ struct OMXCodecObserver : public BnOMXObserver { } // from IOMXObserver - virtual void onMessage(const omx_message &msg) { + virtual void onMessages(const std::list &messages) { sp codec = mTarget.promote(); if (codec.get() != NULL) { Mutex::Autolock autoLock(codec->mLock); - codec->on_message(msg); + for (std::list::const_iterator it = messages.cbegin(); + it != messages.cend(); ++it) { + codec->on_message(*it); + } codec.clear(); } } diff --git a/media/libstagefright/include/OMXNodeInstance.h b/media/libstagefright/include/OMXNodeInstance.h index 76df815..f68e0a9 100644 --- a/media/libstagefright/include/OMXNodeInstance.h +++ b/media/libstagefright/include/OMXNodeInstance.h @@ -125,6 +125,8 @@ struct OMXNodeInstance { const void *data, size_t size); + // handles messages and removes them from the list + void onMessages(std::list &messages); void onMessage(const omx_message &msg); void onObserverDied(OMXMaster *master); void onGetHandleFailed(); @@ -231,6 +233,10 @@ private: sp getGraphicBufferSource(); void setGraphicBufferSource(const sp& bufferSource); + // Handles |msg|, and may modify it. Returns true iff completely handled it and + // |msg| does not need to be sent to the event listener. + bool handleMessage(omx_message &msg); + OMXNodeInstance(const OMXNodeInstance &); OMXNodeInstance &operator=(const OMXNodeInstance &); }; diff --git a/media/libstagefright/omx/OMX.cpp b/media/libstagefright/omx/OMX.cpp index 76217ec..e94adbd 100644 --- a/media/libstagefright/omx/OMX.cpp +++ b/media/libstagefright/omx/OMX.cpp @@ -61,7 +61,11 @@ private: struct OMX::CallbackDispatcher : public RefBase { CallbackDispatcher(OMXNodeInstance *owner); - void post(const omx_message &msg); + // Posts |msg| to the listener's queue. If |realTime| is true, the listener thread is notified + // that a new message is available on the queue. Otherwise, the message stays on the queue, but + // the listener is not notified of it. It will process this message when a subsequent message + // is posted with |realTime| set to true. + void post(const omx_message &msg, bool realTime = true); bool loop(); @@ -74,11 +78,11 @@ private: OMXNodeInstance *mOwner; bool mDone; Condition mQueueChanged; - List mQueue; + std::list mQueue; sp mThread; - void dispatch(const omx_message &msg); + void dispatch(std::list &messages); CallbackDispatcher(const CallbackDispatcher &); CallbackDispatcher &operator=(const CallbackDispatcher &); @@ -109,24 +113,26 @@ OMX::CallbackDispatcher::~CallbackDispatcher() { } } -void OMX::CallbackDispatcher::post(const omx_message &msg) { +void OMX::CallbackDispatcher::post(const omx_message &msg, bool realTime) { Mutex::Autolock autoLock(mLock); mQueue.push_back(msg); - mQueueChanged.signal(); + if (realTime) { + mQueueChanged.signal(); + } } -void OMX::CallbackDispatcher::dispatch(const omx_message &msg) { +void OMX::CallbackDispatcher::dispatch(std::list &messages) { if (mOwner == NULL) { ALOGV("Would have dispatched a message to a node that's already gone."); return; } - mOwner->onMessage(msg); + mOwner->onMessages(messages); } bool OMX::CallbackDispatcher::loop() { for (;;) { - omx_message msg; + std::list messages; { Mutex::Autolock autoLock(mLock); @@ -138,11 +144,10 @@ bool OMX::CallbackDispatcher::loop() { break; } - msg = *mQueue.begin(); - mQueue.erase(mQueue.begin()); + messages.swap(mQueue); } - dispatch(msg); + dispatch(messages); } return false; diff --git a/media/libstagefright/omx/OMXNodeInstance.cpp b/media/libstagefright/omx/OMXNodeInstance.cpp index 9e399f9..7e92da8 100644 --- a/media/libstagefright/omx/OMXNodeInstance.cpp +++ b/media/libstagefright/omx/OMXNodeInstance.cpp @@ -1357,7 +1357,7 @@ status_t OMXNodeInstance::setInternalOption( } } -void OMXNodeInstance::onMessage(const omx_message &msg) { +bool OMXNodeInstance::handleMessage(omx_message &msg) { const sp& bufferSource(getGraphicBufferSource()); if (msg.type == omx_message::FILL_BUFFER_DONE) { @@ -1384,10 +1384,7 @@ void OMXNodeInstance::onMessage(const omx_message &msg) { // fix up the buffer info (especially timestamp) if needed bufferSource->codecBufferFilled(buffer); - omx_message newMsg = msg; - newMsg.u.extended_buffer_data.timestamp = buffer->nTimeStamp; - mObserver->onMessage(newMsg); - return; + msg.u.extended_buffer_data.timestamp = buffer->nTimeStamp; } } else if (msg.type == omx_message::EMPTY_BUFFER_DONE) { OMX_BUFFERHEADERTYPE *buffer = @@ -1408,11 +1405,23 @@ void OMXNodeInstance::onMessage(const omx_message &msg) { // know that anyone asked to have the buffer emptied and will // be very confused. bufferSource->codecBufferEmptied(buffer, msg.fenceFd); - return; + return true; + } + } + + return false; +} + +void OMXNodeInstance::onMessages(std::list &messages) { + for (std::list::iterator it = messages.begin(); it != messages.end(); ) { + if (handleMessage(*it)) { + messages.erase(it++); + } else { + ++it; } } - mObserver->onMessage(msg); + mObserver->onMessages(messages); } void OMXNodeInstance::onObserverDied(OMXMaster *master) { diff --git a/media/libstagefright/omx/tests/OMXHarness.cpp b/media/libstagefright/omx/tests/OMXHarness.cpp index 294b2ed..644b6ed 100644 --- a/media/libstagefright/omx/tests/OMXHarness.cpp +++ b/media/libstagefright/omx/tests/OMXHarness.cpp @@ -64,9 +64,11 @@ status_t Harness::initOMX() { return mOMX != 0 ? OK : NO_INIT; } -void Harness::onMessage(const omx_message &msg) { +void Harness::onMessages(const std::list &messages) { Mutex::Autolock autoLock(mLock); - mMessageQueue.push_back(msg); + for (std::list::const_iterator it = messages.cbegin(); it != messages.cend(); ) { + mMessageQueue.push_back(*it++); + } mMessageAddedCondition.signal(); } diff --git a/media/libstagefright/omx/tests/OMXHarness.h b/media/libstagefright/omx/tests/OMXHarness.h index bb8fd0c..1ebf3aa 100644 --- a/media/libstagefright/omx/tests/OMXHarness.h +++ b/media/libstagefright/omx/tests/OMXHarness.h @@ -74,7 +74,7 @@ struct Harness : public BnOMXObserver { status_t testAll(); - virtual void onMessage(const omx_message &msg); + virtual void onMessages(const std::list &messages); protected: virtual ~Harness(); -- cgit v1.1