diff options
| author | Lajos Molnar <lajos@google.com> | 2015-03-05 14:35:41 -0800 | 
|---|---|---|
| committer | Lajos Molnar <lajos@google.com> | 2015-03-10 10:36:09 -0700 | 
| commit | 3f27436a9346f043f52265da1e6a74cde2bffd4d (patch) | |
| tree | a7404d2ba96071bea1f02f93f6782214fc1e4cc1 | |
| parent | 0655386a0dc15fc31883d2e38917ff0e9db89ae7 (diff) | |
| download | frameworks_av-3f27436a9346f043f52265da1e6a74cde2bffd4d.zip frameworks_av-3f27436a9346f043f52265da1e6a74cde2bffd4d.tar.gz frameworks_av-3f27436a9346f043f52265da1e6a74cde2bffd4d.tar.bz2  | |
stagefright: don't use ALooperRoster mutex for reply handling
Change replyID-s from uint32_t to an object
Move reply handling into the loopers (to reuse a common mutex)
Bug: 19607784
Change-Id: Iaa035b846c424c5687ed17ce1079b325e86c54be
23 files changed, 198 insertions, 118 deletions
diff --git a/cmds/stagefright/SimplePlayer.cpp b/cmds/stagefright/SimplePlayer.cpp index 40762e5..ac1a547 100644 --- a/cmds/stagefright/SimplePlayer.cpp +++ b/cmds/stagefright/SimplePlayer.cpp @@ -116,7 +116,7 @@ void SimplePlayer::onMessageReceived(const sp<AMessage> &msg) {                  mState = UNPREPARED;              } -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              sp<AMessage> response = new AMessage; @@ -139,7 +139,7 @@ void SimplePlayer::onMessageReceived(const sp<AMessage> &msg) {                  err = OK;              } -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              sp<AMessage> response = new AMessage; @@ -161,7 +161,7 @@ void SimplePlayer::onMessageReceived(const sp<AMessage> &msg) {                  }              } -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              sp<AMessage> response = new AMessage; @@ -194,7 +194,7 @@ void SimplePlayer::onMessageReceived(const sp<AMessage> &msg) {                  }              } -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              sp<AMessage> response = new AMessage; @@ -217,7 +217,7 @@ void SimplePlayer::onMessageReceived(const sp<AMessage> &msg) {                  }              } -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              sp<AMessage> response = new AMessage; @@ -240,7 +240,7 @@ void SimplePlayer::onMessageReceived(const sp<AMessage> &msg) {                  mState = UNINITIALIZED;              } -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              sp<AMessage> response = new AMessage; diff --git a/include/media/stagefright/MediaCodec.h b/include/media/stagefright/MediaCodec.h index d448097..8241e19 100644 --- a/include/media/stagefright/MediaCodec.h +++ b/include/media/stagefright/MediaCodec.h @@ -27,6 +27,7 @@ namespace android {  struct ABuffer;  struct AMessage; +struct AReplyToken;  struct AString;  struct CodecBase;  struct ICrypto; @@ -222,7 +223,7 @@ private:      sp<ALooper> mCodecLooper;      sp<CodecBase> mCodec;      AString mComponentName; -    uint32_t mReplyID; +    sp<AReplyToken> mReplyID;      uint32_t mFlags;      status_t mStickyError;      sp<Surface> mNativeWindow; @@ -249,10 +250,10 @@ private:      Vector<BufferInfo> mPortBuffers[2];      int32_t mDequeueInputTimeoutGeneration; -    uint32_t mDequeueInputReplyID; +    sp<AReplyToken> mDequeueInputReplyID;      int32_t mDequeueOutputTimeoutGeneration; -    uint32_t mDequeueOutputReplyID; +    sp<AReplyToken> mDequeueOutputReplyID;      sp<ICrypto> mCrypto; @@ -267,7 +268,7 @@ private:      static status_t PostAndAwaitResponse(              const sp<AMessage> &msg, sp<AMessage> *response); -    static void PostReplyWithError(int32_t replyID, int32_t err); +    static void PostReplyWithError(const sp<AReplyToken> &replyID, int32_t err);      status_t init(const AString &name, bool nameIsType, bool encoder); @@ -283,8 +284,8 @@ private:              size_t portIndex, size_t index,              sp<ABuffer> *buffer, sp<AMessage> *format); -    bool handleDequeueInputBuffer(uint32_t replyID, bool newRequest = false); -    bool handleDequeueOutputBuffer(uint32_t replyID, bool newRequest = false); +    bool handleDequeueInputBuffer(const sp<AReplyToken> &replyID, bool newRequest = false); +    bool handleDequeueOutputBuffer(const sp<AReplyToken> &replyID, bool newRequest = false);      void cancelPendingDequeueOperations();      void extractCSD(const sp<AMessage> &format); diff --git a/include/media/stagefright/MediaCodecSource.h b/include/media/stagefright/MediaCodecSource.h index 0970b2b..7b8f59d 100644 --- a/include/media/stagefright/MediaCodecSource.h +++ b/include/media/stagefright/MediaCodecSource.h @@ -25,6 +25,7 @@ namespace android {  class ALooper;  class AMessage; +struct AReplyToken;  class IGraphicBufferProducer;  class MediaCodec;  class MetaData; @@ -99,7 +100,7 @@ private:      sp<Puller> mPuller;      sp<MediaCodec> mEncoder;      uint32_t mFlags; -    List<uint32_t> mStopReplyIDQueue; +    List<sp<AReplyToken>> mStopReplyIDQueue;      bool mIsVideo;      bool mStarted;      bool mStopping; diff --git a/include/media/stagefright/foundation/ALooper.h b/include/media/stagefright/foundation/ALooper.h index 150cdba..09c469b 100644 --- a/include/media/stagefright/foundation/ALooper.h +++ b/include/media/stagefright/foundation/ALooper.h @@ -30,6 +30,7 @@ namespace android {  struct AHandler;  struct AMessage; +struct AReplyToken;  struct ALooper : public RefBase {      typedef int32_t event_id; @@ -79,7 +80,27 @@ private:      sp<LooperThread> mThread;      bool mRunningLocally; +    // use a separate lock for reply handling, as it is always on another thread +    // use a central lock, however, to avoid creating a mutex for each reply +    Mutex mRepliesLock; +    Condition mRepliesCondition; + +    // START --- methods used only by AMessage + +    // posts a message on this looper with the given timeout      void post(const sp<AMessage> &msg, int64_t delayUs); + +    // creates a reply token to be used with this looper +    sp<AReplyToken> createReplyToken(); +    // waits for a response for the reply token.  If status is OK, the response +    // is stored into the supplied variable.  Otherwise, it is unchanged. +    status_t awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response); +    // posts a reply for a reply token.  If the reply could be successfully posted, +    // it returns OK. Otherwise, it returns an error value. +    status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg); + +    // END --- methods used only by AMessage +      bool loop();      DISALLOW_EVIL_CONSTRUCTORS(ALooper); diff --git a/include/media/stagefright/foundation/ALooperRoster.h b/include/media/stagefright/foundation/ALooperRoster.h index 96bce6b..9912455 100644 --- a/include/media/stagefright/foundation/ALooperRoster.h +++ b/include/media/stagefright/foundation/ALooperRoster.h @@ -33,11 +33,6 @@ struct ALooperRoster {      void unregisterHandler(ALooper::handler_id handlerID);      void unregisterStaleHandlers(); -    status_t postAndAwaitResponse( -            const sp<AMessage> &msg, sp<AMessage> *response); - -    void postReply(uint32_t replyID, const sp<AMessage> &reply); -      void dump(int fd, const Vector<String16>& args);  private: @@ -49,10 +44,6 @@ private:      Mutex mLock;      KeyedVector<ALooper::handler_id, HandlerInfo> mHandlers;      ALooper::handler_id mNextHandlerID; -    uint32_t mNextReplyID; -    Condition mRepliesCondition; - -    KeyedVector<uint32_t, sp<AMessage> > mReplies;      DISALLOW_EVIL_CONSTRUCTORS(ALooperRoster);  }; diff --git a/include/media/stagefright/foundation/AMessage.h b/include/media/stagefright/foundation/AMessage.h index ab3279d..4c6bd21 100644 --- a/include/media/stagefright/foundation/AMessage.h +++ b/include/media/stagefright/foundation/AMessage.h @@ -30,6 +30,34 @@ struct AHandler;  struct AString;  struct Parcel; +struct AReplyToken : public RefBase { +    AReplyToken(const sp<ALooper> &looper) +        : mLooper(looper), +          mReplied(false) { +    } + +private: +    friend struct AMessage; +    friend struct ALooper; +    wp<ALooper> mLooper; +    sp<AMessage> mReply; +    bool mReplied; + +    sp<ALooper> getLooper() const { +        return mLooper.promote(); +    } +    // if reply is not set, returns false; otherwise, it retrieves the reply and returns true +    bool retrieveReply(sp<AMessage> *reply) { +        if (mReplied) { +            *reply = mReply; +            mReply.clear(); +        } +        return mReplied; +    } +    // sets the reply for this token. returns OK or error +    status_t setReply(const sp<AMessage> &reply); +}; +  struct AMessage : public RefBase {      AMessage();      AMessage(uint32_t what, const sp<const AHandler> &handler); @@ -84,11 +112,15 @@ struct AMessage : public RefBase {      status_t postAndAwaitResponse(sp<AMessage> *response);      // If this returns true, the sender of this message is synchronously -    // awaiting a response, the "replyID" can be used to send the response -    // via "postReply" below. -    bool senderAwaitsResponse(uint32_t *replyID) const; - -    void postReply(uint32_t replyID); +    // awaiting a response and the reply token is consumed from the message +    // and stored into replyID. The reply token must be used to send the response +    // using "postReply" below. +    bool senderAwaitsResponse(sp<AReplyToken> *replyID); + +    // Posts the message as a response to a reply token.  A reply token can +    // only be used once. Returns OK if the response could be posted; otherwise, +    // an error. +    status_t postReply(const sp<AReplyToken> &replyID);      // Performs a deep-copy of "this", contained messages are in turn "dup'ed".      // Warning: RefBase items, i.e. "objects" are _not_ copied but only have diff --git a/media/libmediaplayerservice/nuplayer/GenericSource.cpp b/media/libmediaplayerservice/nuplayer/GenericSource.cpp index 24163e8..70ae85e 100644 --- a/media/libmediaplayerservice/nuplayer/GenericSource.cpp +++ b/media/libmediaplayerservice/nuplayer/GenericSource.cpp @@ -888,7 +888,7 @@ void NuPlayer::GenericSource::onMessageReceived(const sp<AMessage> &msg) {                mVideoTrack.mPackets->clear();            }            sp<AMessage> response = new AMessage; -          uint32_t replyID; +          sp<AReplyToken> replyID;            CHECK(msg->senderAwaitsResponse(&replyID));            response->postReply(replyID);            break; @@ -990,7 +990,7 @@ void NuPlayer::GenericSource::onGetFormatMeta(sp<AMessage> msg) const {      sp<MetaData> format = doGetFormatMeta(audio);      response->setPointer("format", format.get()); -    uint32_t replyID; +    sp<AReplyToken> replyID;      CHECK(msg->senderAwaitsResponse(&replyID));      response->postReply(replyID);  } @@ -1153,7 +1153,7 @@ void NuPlayer::GenericSource::onGetSelectedTrack(sp<AMessage> msg) const {      ssize_t index = doGetSelectedTrack(type);      response->setInt32("index", index); -    uint32_t replyID; +    sp<AReplyToken> replyID;      CHECK(msg->senderAwaitsResponse(&replyID));      response->postReply(replyID);  } @@ -1211,7 +1211,7 @@ void NuPlayer::GenericSource::onSelectTrack(sp<AMessage> msg) {      status_t err = doSelectTrack(trackIndex, select, timeUs);      response->setInt32("err", err); -    uint32_t replyID; +    sp<AReplyToken> replyID;      CHECK(msg->senderAwaitsResponse(&replyID));      response->postReply(replyID);  } @@ -1324,7 +1324,7 @@ void NuPlayer::GenericSource::onSeek(sp<AMessage> msg) {      status_t err = doSeek(seekTimeUs);      response->setInt32("err", err); -    uint32_t replyID; +    sp<AReplyToken> replyID;      CHECK(msg->senderAwaitsResponse(&replyID));      response->postReply(replyID);  } diff --git a/media/libmediaplayerservice/nuplayer/NuPlayer.cpp b/media/libmediaplayerservice/nuplayer/NuPlayer.cpp index 007fbb9..f4d3794 100644 --- a/media/libmediaplayerservice/nuplayer/NuPlayer.cpp +++ b/media/libmediaplayerservice/nuplayer/NuPlayer.cpp @@ -408,7 +408,7 @@ void NuPlayer::onMessageReceived(const sp<AMessage> &msg) {          case kWhatGetTrackInfo:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              Parcel* reply; @@ -461,7 +461,7 @@ void NuPlayer::onMessageReceived(const sp<AMessage> &msg) {              sp<AMessage> response = new AMessage;              response->setInt32("err", err); -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              response->postReply(replyID);              break; @@ -469,7 +469,7 @@ void NuPlayer::onMessageReceived(const sp<AMessage> &msg) {          case kWhatSelectTrack:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              size_t trackIndex; diff --git a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderBase.cpp b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderBase.cpp index 72e7b0d..a726239 100644 --- a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderBase.cpp +++ b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderBase.cpp @@ -136,7 +136,7 @@ void NuPlayer::DecoderBase::onMessageReceived(const sp<AMessage> &msg) {          case kWhatGetInputBuffers:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              Vector<sp<ABuffer> > *dstBuffers; diff --git a/media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp b/media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp index 126c9f4..452c158 100644 --- a/media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp +++ b/media/libmediaplayerservice/nuplayer/NuPlayerRenderer.cpp @@ -264,7 +264,7 @@ void NuPlayer::Renderer::onMessageReceived(const sp<AMessage> &msg) {              response->setInt32("err", err);              response->setInt32("offload", offloadingAudio()); -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              response->postReply(replyID); @@ -273,7 +273,7 @@ void NuPlayer::Renderer::onMessageReceived(const sp<AMessage> &msg) {          case kWhatCloseAudioSink:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              onCloseAudioSink(); diff --git a/media/libmediaplayerservice/nuplayer/RTSPSource.cpp b/media/libmediaplayerservice/nuplayer/RTSPSource.cpp index 57d9ab2..5210fc8 100644 --- a/media/libmediaplayerservice/nuplayer/RTSPSource.cpp +++ b/media/libmediaplayerservice/nuplayer/RTSPSource.cpp @@ -311,7 +311,7 @@ void NuPlayer::RTSPSource::performSeek(int64_t seekTimeUs) {  void NuPlayer::RTSPSource::onMessageReceived(const sp<AMessage> &msg) {      if (msg->what() == kWhatDisconnect) { -        uint32_t replyID; +        sp<AReplyToken> replyID;          CHECK(msg->senderAwaitsResponse(&replyID));          mDisconnectReplyID = replyID; diff --git a/media/libmediaplayerservice/nuplayer/RTSPSource.h b/media/libmediaplayerservice/nuplayer/RTSPSource.h index ac3299a..5f2cf33 100644 --- a/media/libmediaplayerservice/nuplayer/RTSPSource.h +++ b/media/libmediaplayerservice/nuplayer/RTSPSource.h @@ -25,6 +25,7 @@  namespace android {  struct ALooper; +struct AReplyToken;  struct AnotherPacketSource;  struct MyHandler;  struct SDPLoader; @@ -96,7 +97,7 @@ private:      bool mIsSDP;      State mState;      status_t mFinalResult; -    uint32_t mDisconnectReplyID; +    sp<AReplyToken> mDisconnectReplyID;      Mutex mBufferingLock;      bool mBuffering; diff --git a/media/libstagefright/MediaCodec.cpp b/media/libstagefright/MediaCodec.cpp index 6282813..0597f1d 100644 --- a/media/libstagefright/MediaCodec.cpp +++ b/media/libstagefright/MediaCodec.cpp @@ -174,7 +174,7 @@ status_t MediaCodec::PostAndAwaitResponse(  }  // static -void MediaCodec::PostReplyWithError(int32_t replyID, int32_t err) { +void MediaCodec::PostReplyWithError(const sp<AReplyToken> &replyID, int32_t err) {      sp<AMessage> response = new AMessage;      response->setInt32("err", err);      response->postReply(replyID); @@ -650,7 +650,7 @@ void MediaCodec::cancelPendingDequeueOperations() {      }  } -bool MediaCodec::handleDequeueInputBuffer(uint32_t replyID, bool newRequest) { +bool MediaCodec::handleDequeueInputBuffer(const sp<AReplyToken> &replyID, bool newRequest) {      if (!isExecuting() || (mFlags & kFlagIsAsync)              || (newRequest && (mFlags & kFlagDequeueInputPending))) {          PostReplyWithError(replyID, INVALID_OPERATION); @@ -674,7 +674,7 @@ bool MediaCodec::handleDequeueInputBuffer(uint32_t replyID, bool newRequest) {      return true;  } -bool MediaCodec::handleDequeueOutputBuffer(uint32_t replyID, bool newRequest) { +bool MediaCodec::handleDequeueOutputBuffer(const sp<AReplyToken> &replyID, bool newRequest) {      sp<AMessage> response = new AMessage;      if (!isExecuting() || (mFlags & kFlagIsAsync) @@ -1198,7 +1198,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatInit:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (mState != UNINITIALIZED) { @@ -1234,7 +1234,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatSetCallback:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (mState == UNINITIALIZED @@ -1266,7 +1266,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatConfigure:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (mState != INITIALIZED) { @@ -1323,7 +1323,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatCreateInputSurface:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              // Must be configured, but can't have been started yet. @@ -1339,7 +1339,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatStart:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (mState == FLUSHED) { @@ -1365,7 +1365,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {              State targetState =                  (msg->what() == kWhatStop) ? INITIALIZED : UNINITIALIZED; -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (!((mFlags & kFlagIsComponentAllocated) && targetState == UNINITIALIZED) // See 1 @@ -1413,7 +1413,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatDequeueInputBuffer:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (mFlags & kFlagIsAsync) { @@ -1474,7 +1474,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatQueueInputBuffer:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (!isExecuting()) { @@ -1493,7 +1493,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatDequeueOutputBuffer:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (mFlags & kFlagIsAsync) { @@ -1548,7 +1548,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatReleaseOutputBuffer:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (!isExecuting()) { @@ -1567,7 +1567,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatSignalEndOfInputStream:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (!isExecuting()) { @@ -1585,7 +1585,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatGetBuffers:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (!isExecuting() || (mFlags & kFlagIsAsync)) { @@ -1619,7 +1619,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatFlush:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (!isExecuting()) { @@ -1645,7 +1645,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {              sp<AMessage> format =                  (msg->what() == kWhatGetOutputFormat ? mOutputFormat : mInputFormat); -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if ((mState != CONFIGURED && mState != STARTING && @@ -1682,7 +1682,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatGetName:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              if (mComponentName.empty()) { @@ -1698,7 +1698,7 @@ void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {          case kWhatSetParameters:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              sp<AMessage> params; diff --git a/media/libstagefright/MediaCodecSource.cpp b/media/libstagefright/MediaCodecSource.cpp index 1ef4170..b6fa810 100644 --- a/media/libstagefright/MediaCodecSource.cpp +++ b/media/libstagefright/MediaCodecSource.cpp @@ -182,7 +182,7 @@ void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {              sp<AMessage> response = new AMessage;              response->setInt32("err", err); -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              response->postReply(replyID);              break; @@ -491,7 +491,7 @@ void MediaCodecSource::signalEOS(status_t err) {      if (mStopping && mEncoderReachedEOS) {          ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio");          // posting reply to everyone that's waiting -        List<uint32_t>::iterator it; +        List<sp<AReplyToken>>::iterator it;          for (it = mStopReplyIDQueue.begin();                  it != mStopReplyIDQueue.end(); it++) {              (new AMessage)->postReply(*it); @@ -766,7 +766,7 @@ void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {      }      case kWhatStart:      { -        uint32_t replyID; +        sp<AReplyToken> replyID;          CHECK(msg->senderAwaitsResponse(&replyID));          sp<RefBase> obj; @@ -782,7 +782,7 @@ void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {      {          ALOGI("encoder (%s) stopping", mIsVideo ? "video" : "audio"); -        uint32_t replyID; +        sp<AReplyToken> replyID;          CHECK(msg->senderAwaitsResponse(&replyID));          if (mEncoderReachedEOS) { diff --git a/media/libstagefright/foundation/ALooper.cpp b/media/libstagefright/foundation/ALooper.cpp index 617d32b..90b5f68 100644 --- a/media/libstagefright/foundation/ALooper.cpp +++ b/media/libstagefright/foundation/ALooper.cpp @@ -16,6 +16,9 @@  //#define LOG_NDEBUG 0  #define LOG_TAG "ALooper" + +#include <media/stagefright/foundation/ADebug.h> +  #include <utils/Log.h>  #include <sys/time.h> @@ -220,4 +223,29 @@ bool ALooper::loop() {      return true;  } +// to be called by AMessage::postAndAwaitResponse only +sp<AReplyToken> ALooper::createReplyToken() { +    return new AReplyToken(this); +} + +// to be called by AMessage::postAndAwaitResponse only +status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) { +    // return status in case we want to handle an interrupted wait +    Mutex::Autolock autoLock(mRepliesLock); +    CHECK(replyToken != NULL); +    while (!replyToken->retrieveReply(response)) { +        mRepliesCondition.wait(mRepliesLock); +    } +    return OK; +} + +status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) { +    Mutex::Autolock autoLock(mRepliesLock); +    status_t err = replyToken->setReply(reply); +    if (err == OK) { +        mRepliesCondition.broadcast(); +    } +    return err; +} +  }  // namespace android diff --git a/media/libstagefright/foundation/ALooperRoster.cpp b/media/libstagefright/foundation/ALooperRoster.cpp index c4e6788..473ce1b 100644 --- a/media/libstagefright/foundation/ALooperRoster.cpp +++ b/media/libstagefright/foundation/ALooperRoster.cpp @@ -30,8 +30,7 @@ namespace android {  static bool verboseStats = false;  ALooperRoster::ALooperRoster() -    : mNextHandlerID(1), -      mNextReplyID(1) { +    : mNextHandlerID(1) {  }  ALooper::handler_id ALooperRoster::registerHandler( @@ -100,39 +99,6 @@ void ALooperRoster::unregisterStaleHandlers() {      }  } -status_t ALooperRoster::postAndAwaitResponse( -        const sp<AMessage> &msg, sp<AMessage> *response) { -    Mutex::Autolock autoLock(mLock); - -    uint32_t replyID = mNextReplyID++; - -    msg->setInt32("replyID", replyID); - -    status_t err = msg->post(0 /* delayUs */); -    if (err != OK) { -        response->clear(); -        return err; -    } - -    ssize_t index; -    while ((index = mReplies.indexOfKey(replyID)) < 0) { -        mRepliesCondition.wait(mLock); -    } - -    *response = mReplies.valueAt(index); -    mReplies.removeItemsAt(index); - -    return OK; -} - -void ALooperRoster::postReply(uint32_t replyID, const sp<AMessage> &reply) { -    Mutex::Autolock autoLock(mLock); - -    CHECK(mReplies.indexOfKey(replyID) < 0); -    mReplies.add(replyID, reply); -    mRepliesCondition.broadcast(); -} -  static void makeFourCC(uint32_t fourcc, char *s) {      s[0] = (fourcc >> 24) & 0xff;      if (s[0]) { diff --git a/media/libstagefright/foundation/AMessage.cpp b/media/libstagefright/foundation/AMessage.cpp index d4add15..e549ff6 100644 --- a/media/libstagefright/foundation/AMessage.cpp +++ b/media/libstagefright/foundation/AMessage.cpp @@ -37,6 +37,17 @@ namespace android {  extern ALooperRoster gLooperRoster; +status_t AReplyToken::setReply(const sp<AMessage> &reply) { +    if (mReplied) { +        ALOGE("trying to post a duplicate reply"); +        return -EBUSY; +    } +    CHECK(mReply == NULL); +    mReply = reply; +    mReplied = true; +    return OK; +} +  AMessage::AMessage(void)      : mWhat(0),        mTarget(0), @@ -355,24 +366,50 @@ status_t AMessage::post(int64_t delayUs) {  }  status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) { -    return gLooperRoster.postAndAwaitResponse(this, response); +    sp<ALooper> looper = mLooper.promote(); +    if (looper == NULL) { +        ALOGW("failed to post message as target looper for handler %d is gone.", mTarget); +        return -ENOENT; +    } + +    sp<AReplyToken> token = looper->createReplyToken(); +    if (token == NULL) { +        ALOGE("failed to create reply token"); +        return -ENOMEM; +    } +    setObject("replyID", token); + +    looper->post(this, 0 /* delayUs */); +    return looper->awaitResponse(token, response);  } -void AMessage::postReply(uint32_t replyID) { -    gLooperRoster.postReply(replyID, this); +status_t AMessage::postReply(const sp<AReplyToken> &replyToken) { +    if (replyToken == NULL) { +        ALOGW("failed to post reply to a NULL token"); +        return -ENOENT; +    } +    sp<ALooper> looper = replyToken->getLooper(); +    if (looper == NULL) { +        ALOGW("failed to post reply as target looper is gone."); +        return -ENOENT; +    } +    return looper->postReply(replyToken, this);  } -bool AMessage::senderAwaitsResponse(uint32_t *replyID) const { -    int32_t tmp; -    bool found = findInt32("replyID", &tmp); +bool AMessage::senderAwaitsResponse(sp<AReplyToken> *replyToken) { +    sp<RefBase> tmp; +    bool found = findObject("replyID", &tmp);      if (!found) {          return false;      } -    *replyID = static_cast<uint32_t>(tmp); +    *replyToken = static_cast<AReplyToken *>(tmp.get()); +    tmp.clear(); +    setObject("replyID", tmp); +    // TODO: delete Object instead of setting it to NULL -    return true; +    return *replyToken != NULL;  }  sp<AMessage> AMessage::dup() const { diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp index dcac765..ea4aab6 100644 --- a/media/libstagefright/httplive/LiveSession.cpp +++ b/media/libstagefright/httplive/LiveSession.cpp @@ -402,7 +402,7 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {          case kWhatSeek:          { -            uint32_t seekReplyID; +            sp<AReplyToken> seekReplyID;              CHECK(msg->senderAwaitsResponse(&seekReplyID));              mSeekReplyID = seekReplyID;              mSeekReply = new AMessage; diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h index 2d3a25a..0d7c5e0 100644 --- a/media/libstagefright/httplive/LiveSession.h +++ b/media/libstagefright/httplive/LiveSession.h @@ -26,6 +26,7 @@  namespace android {  struct ABuffer; +struct AReplyToken;  struct AnotherPacketSource;  struct DataSource;  struct HTTPBase; @@ -203,8 +204,8 @@ private:      bool mReconfigurationInProgress;      bool mSwitchInProgress; -    uint32_t mDisconnectReplyID; -    uint32_t mSeekReplyID; +    sp<AReplyToken> mDisconnectReplyID; +    sp<AReplyToken> mSeekReplyID;      bool mFirstTimeUsValid;      int64_t mFirstTimeUs; diff --git a/media/libstagefright/wifi-display/source/MediaPuller.cpp b/media/libstagefright/wifi-display/source/MediaPuller.cpp index 927a53c..ce07a4e 100644 --- a/media/libstagefright/wifi-display/source/MediaPuller.cpp +++ b/media/libstagefright/wifi-display/source/MediaPuller.cpp @@ -105,7 +105,7 @@ void MediaPuller::onMessageReceived(const sp<AMessage> &msg) {              sp<AMessage> response = new AMessage;              response->setInt32("err", err); -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              response->postReply(replyID);              break; diff --git a/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp b/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp index 210914e..14d0951 100644 --- a/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp +++ b/media/libstagefright/wifi-display/source/WifiDisplaySource.cpp @@ -57,7 +57,7 @@ WifiDisplaySource::WifiDisplaySource(        mNetSession(netSession),        mClient(client),        mSessionID(0), -      mStopReplyID(0), +      mStopReplyID(NULL),        mChosenRTPPort(-1),        mUsingPCMAudio(false),        mClientSessionID(0), @@ -138,7 +138,7 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) {      switch (msg->what()) {          case kWhatStart:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              AString iface; @@ -325,7 +325,7 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) {          case kWhatPause:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              status_t err = OK; @@ -345,7 +345,7 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) {          case kWhatResume:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              CHECK(msg->senderAwaitsResponse(&replyID));              status_t err = OK; @@ -492,7 +492,7 @@ void WifiDisplaySource::onMessageReceived(const sp<AMessage> &msg) {              if (mState == AWAITING_CLIENT_TEARDOWN) {                  ALOGI("TEARDOWN trigger timed out, forcing disconnection."); -                CHECK_NE(mStopReplyID, 0); +                CHECK(mStopReplyID != NULL);                  finishStop();                  break;              } @@ -1470,7 +1470,7 @@ status_t WifiDisplaySource::onTeardownRequest(      mNetSession->sendRequest(sessionID, response.c_str());      if (mState == AWAITING_CLIENT_TEARDOWN) { -        CHECK_NE(mStopReplyID, 0); +        CHECK(mStopReplyID != NULL);          finishStop();      } else {          mClient->onDisplayError(IRemoteDisplayClient::kDisplayErrorUnknown); diff --git a/media/libstagefright/wifi-display/source/WifiDisplaySource.h b/media/libstagefright/wifi-display/source/WifiDisplaySource.h index 750265f..0f779e4 100644 --- a/media/libstagefright/wifi-display/source/WifiDisplaySource.h +++ b/media/libstagefright/wifi-display/source/WifiDisplaySource.h @@ -27,6 +27,7 @@  namespace android { +struct AReplyToken;  struct IHDCP;  struct IRemoteDisplayClient;  struct ParsedMessage; @@ -121,7 +122,7 @@ private:      struct in_addr mInterfaceAddr;      int32_t mSessionID; -    uint32_t mStopReplyID; +    sp<AReplyToken> mStopReplyID;      AString mWfdClientRtpPorts;      int32_t mChosenRTPPort;  // extracted from "wfd_client_rtp_ports" diff --git a/media/ndk/NdkMediaCodec.cpp b/media/ndk/NdkMediaCodec.cpp index 46757bc..80c1c2f 100644 --- a/media/ndk/NdkMediaCodec.cpp +++ b/media/ndk/NdkMediaCodec.cpp @@ -116,7 +116,7 @@ void CodecHandler::onMessageReceived(const sp<AMessage> &msg) {          case kWhatStopActivityNotifications:          { -            uint32_t replyID; +            sp<AReplyToken> replyID;              msg->senderAwaitsResponse(&replyID);              mCodec->mGeneration++;  | 
