summaryrefslogtreecommitdiffstats
path: root/Source/WebKit2/Platform/CoreIPC/Connection.cpp
diff options
context:
space:
mode:
authorSteve Block <steveblock@google.com>2011-05-18 13:36:51 +0100
committerSteve Block <steveblock@google.com>2011-05-24 15:38:28 +0100
commit2fc2651226baac27029e38c9d6ef883fa32084db (patch)
treee396d4bf89dcce6ed02071be66212495b1df1dec /Source/WebKit2/Platform/CoreIPC/Connection.cpp
parentb3725cedeb43722b3b175aaeff70552e562d2c94 (diff)
downloadexternal_webkit-2fc2651226baac27029e38c9d6ef883fa32084db.zip
external_webkit-2fc2651226baac27029e38c9d6ef883fa32084db.tar.gz
external_webkit-2fc2651226baac27029e38c9d6ef883fa32084db.tar.bz2
Merge WebKit at r78450: Initial merge by git.
Change-Id: I6d3e5f1f868ec266a0aafdef66182ddc3f265dc1
Diffstat (limited to 'Source/WebKit2/Platform/CoreIPC/Connection.cpp')
-rw-r--r--Source/WebKit2/Platform/CoreIPC/Connection.cpp325
1 files changed, 237 insertions, 88 deletions
diff --git a/Source/WebKit2/Platform/CoreIPC/Connection.cpp b/Source/WebKit2/Platform/CoreIPC/Connection.cpp
index da92ce4..5cbd4bc 100644
--- a/Source/WebKit2/Platform/CoreIPC/Connection.cpp
+++ b/Source/WebKit2/Platform/CoreIPC/Connection.cpp
@@ -23,8 +23,10 @@
* THE POSSIBILITY OF SUCH DAMAGE.
*/
+#include "config.h"
#include "Connection.h"
+#include "BinarySemaphore.h"
#include "CoreIPCMessageKinds.h"
#include "RunLoop.h"
#include "WorkItem.h"
@@ -34,6 +36,156 @@ using namespace std;
namespace CoreIPC {
+class Connection::SyncMessageState : public RefCounted<Connection::SyncMessageState> {
+public:
+ static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*);
+ ~SyncMessageState();
+
+ void beginWaitForSyncReply();
+ void endWaitForSyncReply();
+
+ void wakeUpClientRunLoop()
+ {
+ m_waitForSyncReplySemaphore.signal();
+ }
+
+ bool wait(double absoluteTime)
+ {
+ return m_waitForSyncReplySemaphore.wait(absoluteTime);
+ }
+
+ // Returns true if this message will be handled on a client thread that is currently
+ // waiting for a reply to a synchronous message.
+ bool processIncomingMessage(Connection*, IncomingMessage&);
+
+ void dispatchMessages();
+
+private:
+ explicit SyncMessageState(RunLoop*);
+
+ typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap;
+ static SyncMessageStateMap& syncMessageStateMap()
+ {
+ DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ());
+ return syncMessageStateMap;
+ }
+
+ static Mutex& syncMessageStateMapMutex()
+ {
+ DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ());
+ return syncMessageStateMapMutex;
+ }
+
+ RunLoop* m_runLoop;
+ BinarySemaphore m_waitForSyncReplySemaphore;
+
+ // Protects m_waitForSyncReplyCount and m_messagesToDispatchWhileWaitingForSyncReply.
+ Mutex m_mutex;
+
+ unsigned m_waitForSyncReplyCount;
+
+ struct ConnectionAndIncomingMessage {
+ Connection* connection;
+ IncomingMessage incomingMessage;
+ };
+ Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
+};
+
+PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
+{
+ MutexLocker locker(syncMessageStateMapMutex());
+ pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0);
+
+ if (!result.second) {
+ ASSERT(result.first->second);
+ return result.first->second;
+ }
+
+ RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
+ result.first->second = syncMessageState.get();
+
+ return syncMessageState.release();
+}
+
+Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop)
+ : m_runLoop(runLoop)
+ , m_waitForSyncReplyCount(0)
+{
+}
+
+Connection::SyncMessageState::~SyncMessageState()
+{
+ MutexLocker locker(syncMessageStateMapMutex());
+
+ ASSERT(syncMessageStateMap().contains(m_runLoop));
+ syncMessageStateMap().remove(m_runLoop);
+}
+
+void Connection::SyncMessageState::beginWaitForSyncReply()
+{
+ ASSERT(RunLoop::current() == m_runLoop);
+
+ MutexLocker locker(m_mutex);
+ m_waitForSyncReplyCount++;
+}
+
+void Connection::SyncMessageState::endWaitForSyncReply()
+{
+ ASSERT(RunLoop::current() == m_runLoop);
+
+ MutexLocker locker(m_mutex);
+ ASSERT(m_waitForSyncReplyCount);
+ --m_waitForSyncReplyCount;
+
+ if (m_waitForSyncReplyCount)
+ return;
+
+ // Dispatch any remaining incoming sync messages.
+ for (size_t i = 0; i < m_messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
+ ConnectionAndIncomingMessage& connectionAndIncomingMessage = m_messagesToDispatchWhileWaitingForSyncReply[i];
+ connectionAndIncomingMessage.connection->enqueueIncomingMessage(connectionAndIncomingMessage.incomingMessage);
+ }
+
+ m_messagesToDispatchWhileWaitingForSyncReply.clear();
+}
+
+bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage)
+{
+ MessageID messageID = incomingMessage.messageID();
+ if (!messageID.isSync() && !messageID.shouldDispatchMessageWhenWaitingForSyncReply())
+ return false;
+
+ MutexLocker locker(m_mutex);
+ if (!m_waitForSyncReplyCount)
+ return false;
+
+ ConnectionAndIncomingMessage connectionAndIncomingMessage;
+ connectionAndIncomingMessage.connection = connection;
+ connectionAndIncomingMessage.incomingMessage = incomingMessage;
+
+ m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage);
+ wakeUpClientRunLoop();
+
+ return true;
+}
+
+void Connection::SyncMessageState::dispatchMessages()
+{
+ ASSERT(m_runLoop == RunLoop::current());
+
+ Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
+
+ {
+ MutexLocker locker(m_mutex);
+ m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
+ }
+
+ for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
+ ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
+ connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage);
+ }
+}
+
PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
{
return adoptRef(new Connection(identifier, true, client, clientRunLoop));
@@ -48,11 +200,13 @@ Connection::Connection(Identifier identifier, bool isServer, Client* client, Run
: m_client(client)
, m_isServer(isServer)
, m_syncRequestID(0)
+ , m_didCloseOnConnectionWorkQueueCallback(0)
, m_isConnected(false)
, m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
, m_clientRunLoop(clientRunLoop)
, m_inDispatchMessageCount(0)
, m_didReceiveInvalidMessage(false)
+ , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
, m_shouldWaitForSyncReplies(true)
{
ASSERT(m_client);
@@ -67,6 +221,13 @@ Connection::~Connection()
m_connectionQueue.invalidate();
}
+void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
+{
+ ASSERT(!m_isConnected);
+
+ m_didCloseOnConnectionWorkQueueCallback = callback;
+}
+
void Connection::invalidate()
{
if (!isValid()) {
@@ -99,11 +260,14 @@ PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_
return argumentEncoder.release();
}
-bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
+bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags)
{
if (!isValid())
return false;
+ if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply)
+ messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply);
+
MutexLocker locker(m_outgoingMessagesLock);
m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
@@ -179,21 +343,27 @@ PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uin
// We only allow sending sync messages from the client run loop.
ASSERT(RunLoop::current() == m_clientRunLoop);
- if (!isValid())
+ if (!isValid()) {
+ m_client->didFailToSendSyncMessage(this);
return 0;
+ }
// Push the pending sync reply information on our stack.
{
MutexLocker locker(m_syncReplyStateMutex);
- if (!m_shouldWaitForSyncReplies)
+ if (!m_shouldWaitForSyncReplies) {
+ m_client->didFailToSendSyncMessage(this);
return 0;
+ }
m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
}
// First send the message.
sendMessage(messageID, encoder);
-
+
+ m_syncMessageState->beginWaitForSyncReply();
+
// Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
// keep an extra reference to the connection here in case it's invalidated.
RefPtr<Connection> protect(this);
@@ -204,22 +374,13 @@ PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uin
MutexLocker locker(m_syncReplyStateMutex);
ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
m_pendingSyncReplies.removeLast();
-
- if (m_pendingSyncReplies.isEmpty()) {
- // This was the bottom-most sendSyncMessage call in the stack. If we have any pending incoming
- // sync messages, they need to be dispatched.
- if (!m_syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) {
- // Add the messages.
- MutexLocker locker(m_incomingMessagesLock);
- m_incomingMessages.append(m_syncMessagesReceivedWhileWaitingForSyncReply);
- m_syncMessagesReceivedWhileWaitingForSyncReply.clear();
-
- // Schedule for the messages to be sent.
- m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
- }
- }
}
-
+
+ m_syncMessageState->endWaitForSyncReply();
+
+ if (!reply)
+ m_client->didFailToSendSyncMessage(this);
+
return reply.release();
}
@@ -229,27 +390,12 @@ PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID,
bool timedOut = false;
while (!timedOut) {
+ // First, check if we have any messages that we need to process.
+ m_syncMessageState->dispatchMessages();
+
{
MutexLocker locker(m_syncReplyStateMutex);
- // First, check if we have any incoming sync messages that we need to process.
- Vector<IncomingMessage> syncMessagesReceivedWhileWaitingForSyncReply;
- m_syncMessagesReceivedWhileWaitingForSyncReply.swap(syncMessagesReceivedWhileWaitingForSyncReply);
-
- if (!syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) {
- // Make sure to unlock the mutex here because we're calling out to client code which could in turn send
- // another sync message and we don't want that to deadlock.
- m_syncReplyStateMutex.unlock();
-
- for (size_t i = 0; i < syncMessagesReceivedWhileWaitingForSyncReply.size(); ++i) {
- IncomingMessage& message = syncMessagesReceivedWhileWaitingForSyncReply[i];
- OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
-
- dispatchSyncMessage(message.messageID(), arguments.get());
- }
- m_syncReplyStateMutex.lock();
- }
-
// Second, check if there is a sync reply at the top of the stack.
ASSERT(!m_pendingSyncReplies.isEmpty());
@@ -262,7 +408,7 @@ PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID,
}
// We didn't find a sync reply yet, keep waiting.
- timedOut = !m_waitForSyncReplySemaphore.wait(absoluteTime);
+ timedOut = !m_syncMessageState->wait(absoluteTime);
}
// We timed out.
@@ -281,42 +427,33 @@ void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<Argument
pendingSyncReply.replyDecoder = arguments.leakPtr();
pendingSyncReply.didReceiveReply = true;
-
- m_waitForSyncReplySemaphore.signal();
+ m_syncMessageState->wakeUpClientRunLoop();
return;
}
- // Check if this is a sync message. If it is, and we're waiting for a sync reply this message
- // needs to be dispatched. If we don't we'll end up with a deadlock where both sync message senders are
- // stuck waiting for a reply.
- if (messageID.isSync()) {
- MutexLocker locker(m_syncReplyStateMutex);
- if (!m_pendingSyncReplies.isEmpty()) {
- m_syncMessagesReceivedWhileWaitingForSyncReply.append(IncomingMessage(messageID, arguments));
+ IncomingMessage incomingMessage(messageID, arguments);
+
+ // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
+ // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
+ // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
+ if (m_syncMessageState->processIncomingMessage(this, incomingMessage))
+ return;
- // The message has been added, now wake up the client thread.
- m_waitForSyncReplySemaphore.signal();
- return;
- }
- }
-
// Check if we're waiting for this message.
{
MutexLocker locker(m_waitForMessageMutex);
- HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), arguments->destinationID()));
+ HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID()));
if (it != m_waitForMessageMap.end()) {
- it->second = arguments.leakPtr();
+ it->second = incomingMessage.releaseArguments().leakPtr();
+ ASSERT(it->second);
m_waitForMessageCondition.signal();
return;
}
}
- MutexLocker locker(m_incomingMessagesLock);
- m_incomingMessages.append(IncomingMessage(messageID, arguments));
-
- m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
+ enqueueIncomingMessage(incomingMessage);
}
void Connection::connectionDidClose()
@@ -331,10 +468,11 @@ void Connection::connectionDidClose()
m_shouldWaitForSyncReplies = false;
if (!m_pendingSyncReplies.isEmpty())
- m_waitForSyncReplySemaphore.signal();
+ m_syncMessageState->wakeUpClientRunLoop();
}
- m_client->didCloseOnConnectionWorkQueue(&m_connectionQueue, this);
+ if (m_didCloseOnConnectionWorkQueueCallback)
+ m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this);
m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
}
@@ -412,42 +550,53 @@ void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* argum
sendSyncReply(replyEncoder);
}
-void Connection::dispatchMessages()
+void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage)
{
- Vector<IncomingMessage> incomingMessages;
-
- {
- MutexLocker locker(m_incomingMessagesLock);
- m_incomingMessages.swap(incomingMessages);
- }
+ MutexLocker locker(m_incomingMessagesLock);
+ m_incomingMessages.append(incomingMessage);
- // Dispatch messages.
- for (size_t i = 0; i < incomingMessages.size(); ++i) {
- // If someone calls invalidate while we're invalidating messages, we should stop.
- if (!m_client)
- return;
-
- IncomingMessage& message = incomingMessages[i];
- OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
+ m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
+}
+
+void Connection::dispatchMessage(IncomingMessage& message)
+{
+ OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
+
+ // If there's no client, return. We do this after calling releaseArguments so that
+ // the ArgumentDecoder message will be freed.
+ if (!m_client)
+ return;
- m_inDispatchMessageCount++;
+ m_inDispatchMessageCount++;
- bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
- m_didReceiveInvalidMessage = false;
+ bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
+ m_didReceiveInvalidMessage = false;
- if (message.messageID().isSync())
- dispatchSyncMessage(message.messageID(), arguments.get());
- else
- m_client->didReceiveMessage(this, message.messageID(), arguments.get());
+ if (message.messageID().isSync())
+ dispatchSyncMessage(message.messageID(), arguments.get());
+ else
+ m_client->didReceiveMessage(this, message.messageID(), arguments.get());
- m_didReceiveInvalidMessage |= arguments->isInvalid();
- m_inDispatchMessageCount--;
+ m_didReceiveInvalidMessage |= arguments->isInvalid();
+ m_inDispatchMessageCount--;
- if (m_didReceiveInvalidMessage)
- m_client->didReceiveInvalidMessage(this, message.messageID());
+ if (m_didReceiveInvalidMessage && m_client)
+ m_client->didReceiveInvalidMessage(this, message.messageID());
+
+ m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
+}
- m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
+void Connection::dispatchMessages()
+{
+ Vector<IncomingMessage> incomingMessages;
+
+ {
+ MutexLocker locker(m_incomingMessagesLock);
+ m_incomingMessages.swap(incomingMessages);
}
+
+ for (size_t i = 0; i < incomingMessages.size(); ++i)
+ dispatchMessage(incomingMessages[i]);
}
} // namespace CoreIPC