diff options
author | Steve Block <steveblock@google.com> | 2011-05-18 13:36:51 +0100 |
---|---|---|
committer | Steve Block <steveblock@google.com> | 2011-05-24 15:38:28 +0100 |
commit | 2fc2651226baac27029e38c9d6ef883fa32084db (patch) | |
tree | e396d4bf89dcce6ed02071be66212495b1df1dec /Source/WebKit2/Platform/CoreIPC/Connection.cpp | |
parent | b3725cedeb43722b3b175aaeff70552e562d2c94 (diff) | |
download | external_webkit-2fc2651226baac27029e38c9d6ef883fa32084db.zip external_webkit-2fc2651226baac27029e38c9d6ef883fa32084db.tar.gz external_webkit-2fc2651226baac27029e38c9d6ef883fa32084db.tar.bz2 |
Merge WebKit at r78450: Initial merge by git.
Change-Id: I6d3e5f1f868ec266a0aafdef66182ddc3f265dc1
Diffstat (limited to 'Source/WebKit2/Platform/CoreIPC/Connection.cpp')
-rw-r--r-- | Source/WebKit2/Platform/CoreIPC/Connection.cpp | 325 |
1 files changed, 237 insertions, 88 deletions
diff --git a/Source/WebKit2/Platform/CoreIPC/Connection.cpp b/Source/WebKit2/Platform/CoreIPC/Connection.cpp index da92ce4..5cbd4bc 100644 --- a/Source/WebKit2/Platform/CoreIPC/Connection.cpp +++ b/Source/WebKit2/Platform/CoreIPC/Connection.cpp @@ -23,8 +23,10 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "Connection.h" +#include "BinarySemaphore.h" #include "CoreIPCMessageKinds.h" #include "RunLoop.h" #include "WorkItem.h" @@ -34,6 +36,156 @@ using namespace std; namespace CoreIPC { +class Connection::SyncMessageState : public RefCounted<Connection::SyncMessageState> { +public: + static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*); + ~SyncMessageState(); + + void beginWaitForSyncReply(); + void endWaitForSyncReply(); + + void wakeUpClientRunLoop() + { + m_waitForSyncReplySemaphore.signal(); + } + + bool wait(double absoluteTime) + { + return m_waitForSyncReplySemaphore.wait(absoluteTime); + } + + // Returns true if this message will be handled on a client thread that is currently + // waiting for a reply to a synchronous message. + bool processIncomingMessage(Connection*, IncomingMessage&); + + void dispatchMessages(); + +private: + explicit SyncMessageState(RunLoop*); + + typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap; + static SyncMessageStateMap& syncMessageStateMap() + { + DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ()); + return syncMessageStateMap; + } + + static Mutex& syncMessageStateMapMutex() + { + DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ()); + return syncMessageStateMapMutex; + } + + RunLoop* m_runLoop; + BinarySemaphore m_waitForSyncReplySemaphore; + + // Protects m_waitForSyncReplyCount and m_messagesToDispatchWhileWaitingForSyncReply. + Mutex m_mutex; + + unsigned m_waitForSyncReplyCount; + + struct ConnectionAndIncomingMessage { + Connection* connection; + IncomingMessage incomingMessage; + }; + Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply; +}; + +PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop) +{ + MutexLocker locker(syncMessageStateMapMutex()); + pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0); + + if (!result.second) { + ASSERT(result.first->second); + return result.first->second; + } + + RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop)); + result.first->second = syncMessageState.get(); + + return syncMessageState.release(); +} + +Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop) + : m_runLoop(runLoop) + , m_waitForSyncReplyCount(0) +{ +} + +Connection::SyncMessageState::~SyncMessageState() +{ + MutexLocker locker(syncMessageStateMapMutex()); + + ASSERT(syncMessageStateMap().contains(m_runLoop)); + syncMessageStateMap().remove(m_runLoop); +} + +void Connection::SyncMessageState::beginWaitForSyncReply() +{ + ASSERT(RunLoop::current() == m_runLoop); + + MutexLocker locker(m_mutex); + m_waitForSyncReplyCount++; +} + +void Connection::SyncMessageState::endWaitForSyncReply() +{ + ASSERT(RunLoop::current() == m_runLoop); + + MutexLocker locker(m_mutex); + ASSERT(m_waitForSyncReplyCount); + --m_waitForSyncReplyCount; + + if (m_waitForSyncReplyCount) + return; + + // Dispatch any remaining incoming sync messages. + for (size_t i = 0; i < m_messagesToDispatchWhileWaitingForSyncReply.size(); ++i) { + ConnectionAndIncomingMessage& connectionAndIncomingMessage = m_messagesToDispatchWhileWaitingForSyncReply[i]; + connectionAndIncomingMessage.connection->enqueueIncomingMessage(connectionAndIncomingMessage.incomingMessage); + } + + m_messagesToDispatchWhileWaitingForSyncReply.clear(); +} + +bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage) +{ + MessageID messageID = incomingMessage.messageID(); + if (!messageID.isSync() && !messageID.shouldDispatchMessageWhenWaitingForSyncReply()) + return false; + + MutexLocker locker(m_mutex); + if (!m_waitForSyncReplyCount) + return false; + + ConnectionAndIncomingMessage connectionAndIncomingMessage; + connectionAndIncomingMessage.connection = connection; + connectionAndIncomingMessage.incomingMessage = incomingMessage; + + m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage); + wakeUpClientRunLoop(); + + return true; +} + +void Connection::SyncMessageState::dispatchMessages() +{ + ASSERT(m_runLoop == RunLoop::current()); + + Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply; + + { + MutexLocker locker(m_mutex); + m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply); + } + + for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) { + ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i]; + connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage); + } +} + PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop) { return adoptRef(new Connection(identifier, true, client, clientRunLoop)); @@ -48,11 +200,13 @@ Connection::Connection(Identifier identifier, bool isServer, Client* client, Run : m_client(client) , m_isServer(isServer) , m_syncRequestID(0) + , m_didCloseOnConnectionWorkQueueCallback(0) , m_isConnected(false) , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue") , m_clientRunLoop(clientRunLoop) , m_inDispatchMessageCount(0) , m_didReceiveInvalidMessage(false) + , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop)) , m_shouldWaitForSyncReplies(true) { ASSERT(m_client); @@ -67,6 +221,13 @@ Connection::~Connection() m_connectionQueue.invalidate(); } +void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback) +{ + ASSERT(!m_isConnected); + + m_didCloseOnConnectionWorkQueueCallback = callback; +} + void Connection::invalidate() { if (!isValid()) { @@ -99,11 +260,14 @@ PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_ return argumentEncoder.release(); } -bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments) +bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags) { if (!isValid()) return false; + if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply) + messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply); + MutexLocker locker(m_outgoingMessagesLock); m_outgoingMessages.append(OutgoingMessage(messageID, arguments)); @@ -179,21 +343,27 @@ PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uin // We only allow sending sync messages from the client run loop. ASSERT(RunLoop::current() == m_clientRunLoop); - if (!isValid()) + if (!isValid()) { + m_client->didFailToSendSyncMessage(this); return 0; + } // Push the pending sync reply information on our stack. { MutexLocker locker(m_syncReplyStateMutex); - if (!m_shouldWaitForSyncReplies) + if (!m_shouldWaitForSyncReplies) { + m_client->didFailToSendSyncMessage(this); return 0; + } m_pendingSyncReplies.append(PendingSyncReply(syncRequestID)); } // First send the message. sendMessage(messageID, encoder); - + + m_syncMessageState->beginWaitForSyncReply(); + // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so // keep an extra reference to the connection here in case it's invalidated. RefPtr<Connection> protect(this); @@ -204,22 +374,13 @@ PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uin MutexLocker locker(m_syncReplyStateMutex); ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID); m_pendingSyncReplies.removeLast(); - - if (m_pendingSyncReplies.isEmpty()) { - // This was the bottom-most sendSyncMessage call in the stack. If we have any pending incoming - // sync messages, they need to be dispatched. - if (!m_syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) { - // Add the messages. - MutexLocker locker(m_incomingMessagesLock); - m_incomingMessages.append(m_syncMessagesReceivedWhileWaitingForSyncReply); - m_syncMessagesReceivedWhileWaitingForSyncReply.clear(); - - // Schedule for the messages to be sent. - m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages)); - } - } } - + + m_syncMessageState->endWaitForSyncReply(); + + if (!reply) + m_client->didFailToSendSyncMessage(this); + return reply.release(); } @@ -229,27 +390,12 @@ PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, bool timedOut = false; while (!timedOut) { + // First, check if we have any messages that we need to process. + m_syncMessageState->dispatchMessages(); + { MutexLocker locker(m_syncReplyStateMutex); - // First, check if we have any incoming sync messages that we need to process. - Vector<IncomingMessage> syncMessagesReceivedWhileWaitingForSyncReply; - m_syncMessagesReceivedWhileWaitingForSyncReply.swap(syncMessagesReceivedWhileWaitingForSyncReply); - - if (!syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) { - // Make sure to unlock the mutex here because we're calling out to client code which could in turn send - // another sync message and we don't want that to deadlock. - m_syncReplyStateMutex.unlock(); - - for (size_t i = 0; i < syncMessagesReceivedWhileWaitingForSyncReply.size(); ++i) { - IncomingMessage& message = syncMessagesReceivedWhileWaitingForSyncReply[i]; - OwnPtr<ArgumentDecoder> arguments = message.releaseArguments(); - - dispatchSyncMessage(message.messageID(), arguments.get()); - } - m_syncReplyStateMutex.lock(); - } - // Second, check if there is a sync reply at the top of the stack. ASSERT(!m_pendingSyncReplies.isEmpty()); @@ -262,7 +408,7 @@ PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, } // We didn't find a sync reply yet, keep waiting. - timedOut = !m_waitForSyncReplySemaphore.wait(absoluteTime); + timedOut = !m_syncMessageState->wait(absoluteTime); } // We timed out. @@ -281,42 +427,33 @@ void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<Argument pendingSyncReply.replyDecoder = arguments.leakPtr(); pendingSyncReply.didReceiveReply = true; - - m_waitForSyncReplySemaphore.signal(); + m_syncMessageState->wakeUpClientRunLoop(); return; } - // Check if this is a sync message. If it is, and we're waiting for a sync reply this message - // needs to be dispatched. If we don't we'll end up with a deadlock where both sync message senders are - // stuck waiting for a reply. - if (messageID.isSync()) { - MutexLocker locker(m_syncReplyStateMutex); - if (!m_pendingSyncReplies.isEmpty()) { - m_syncMessagesReceivedWhileWaitingForSyncReply.append(IncomingMessage(messageID, arguments)); + IncomingMessage incomingMessage(messageID, arguments); + + // Check if this is a sync message or if it's a message that should be dispatched even when waiting for + // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched. + // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply. + if (m_syncMessageState->processIncomingMessage(this, incomingMessage)) + return; - // The message has been added, now wake up the client thread. - m_waitForSyncReplySemaphore.signal(); - return; - } - } - // Check if we're waiting for this message. { MutexLocker locker(m_waitForMessageMutex); - HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), arguments->destinationID())); + HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID())); if (it != m_waitForMessageMap.end()) { - it->second = arguments.leakPtr(); + it->second = incomingMessage.releaseArguments().leakPtr(); + ASSERT(it->second); m_waitForMessageCondition.signal(); return; } } - MutexLocker locker(m_incomingMessagesLock); - m_incomingMessages.append(IncomingMessage(messageID, arguments)); - - m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages)); + enqueueIncomingMessage(incomingMessage); } void Connection::connectionDidClose() @@ -331,10 +468,11 @@ void Connection::connectionDidClose() m_shouldWaitForSyncReplies = false; if (!m_pendingSyncReplies.isEmpty()) - m_waitForSyncReplySemaphore.signal(); + m_syncMessageState->wakeUpClientRunLoop(); } - m_client->didCloseOnConnectionWorkQueue(&m_connectionQueue, this); + if (m_didCloseOnConnectionWorkQueueCallback) + m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this); m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose)); } @@ -412,42 +550,53 @@ void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* argum sendSyncReply(replyEncoder); } -void Connection::dispatchMessages() +void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage) { - Vector<IncomingMessage> incomingMessages; - - { - MutexLocker locker(m_incomingMessagesLock); - m_incomingMessages.swap(incomingMessages); - } + MutexLocker locker(m_incomingMessagesLock); + m_incomingMessages.append(incomingMessage); - // Dispatch messages. - for (size_t i = 0; i < incomingMessages.size(); ++i) { - // If someone calls invalidate while we're invalidating messages, we should stop. - if (!m_client) - return; - - IncomingMessage& message = incomingMessages[i]; - OwnPtr<ArgumentDecoder> arguments = message.releaseArguments(); + m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages)); +} + +void Connection::dispatchMessage(IncomingMessage& message) +{ + OwnPtr<ArgumentDecoder> arguments = message.releaseArguments(); + + // If there's no client, return. We do this after calling releaseArguments so that + // the ArgumentDecoder message will be freed. + if (!m_client) + return; - m_inDispatchMessageCount++; + m_inDispatchMessageCount++; - bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage; - m_didReceiveInvalidMessage = false; + bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage; + m_didReceiveInvalidMessage = false; - if (message.messageID().isSync()) - dispatchSyncMessage(message.messageID(), arguments.get()); - else - m_client->didReceiveMessage(this, message.messageID(), arguments.get()); + if (message.messageID().isSync()) + dispatchSyncMessage(message.messageID(), arguments.get()); + else + m_client->didReceiveMessage(this, message.messageID(), arguments.get()); - m_didReceiveInvalidMessage |= arguments->isInvalid(); - m_inDispatchMessageCount--; + m_didReceiveInvalidMessage |= arguments->isInvalid(); + m_inDispatchMessageCount--; - if (m_didReceiveInvalidMessage) - m_client->didReceiveInvalidMessage(this, message.messageID()); + if (m_didReceiveInvalidMessage && m_client) + m_client->didReceiveInvalidMessage(this, message.messageID()); + + m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage; +} - m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage; +void Connection::dispatchMessages() +{ + Vector<IncomingMessage> incomingMessages; + + { + MutexLocker locker(m_incomingMessagesLock); + m_incomingMessages.swap(incomingMessages); } + + for (size_t i = 0; i < incomingMessages.size(); ++i) + dispatchMessage(incomingMessages[i]); } } // namespace CoreIPC |