diff options
author | Steve Block <steveblock@google.com> | 2011-05-25 08:15:24 -0700 |
---|---|---|
committer | Android (Google) Code Review <android-gerrit@google.com> | 2011-05-25 08:15:24 -0700 |
commit | fa91a01aee5d4a80ca6c80f722116b850f09996c (patch) | |
tree | f72740e60d3c3d4f0ab144e88c03d1f134944ce3 /Source/WebKit2/Platform/CoreIPC | |
parent | 96f37d6d1b390f6690858789706ee6ec25bc1677 (diff) | |
parent | feebf8e7a79ad68b04a1a948e2b8078d6e5f0048 (diff) | |
download | external_webkit-fa91a01aee5d4a80ca6c80f722116b850f09996c.zip external_webkit-fa91a01aee5d4a80ca6c80f722116b850f09996c.tar.gz external_webkit-fa91a01aee5d4a80ca6c80f722116b850f09996c.tar.bz2 |
Merge changes I78ff6a85,Ic85c6405,Ibf903baa,I3a0459db,I35140385,I54790419,I6bfe5d24,Ia9f39b83,I5bcecd5a,I1de96683,I543c6810,I8a5b0878,I0ae670bf,Ide4d58dc,I28ebaf3d,I499d6631,Ie5090e0d,I6d3e5f1f
* changes:
Merge WebKit at r78450: Update ThirdPartyProject.prop
Merge WebKit at r78450: Add new Font::canExpandAroundIdeographsInComplexText()
Merge WebKit at r78450: Add new ChromeClient::selectItemAlignmentFollowsMenuWritingDirection()
Merge WebKit at r78450: FrameLoaderClient::didRunInsecureContent() signature changed
Merge WebKit at r78450: HTMLAreaElement::getRect() renamed
Merge WebKit at r78450: FrameLoader::url() removed
Merge WebKit at r78450: HTMLParserQuirks removed
Merge WebKit at r78450: TextRun::padding() renamed
Merge WebKit at r78450: Use new FontMetrics
Merge WebKit at r78450: GraphicsContext current path removed
Merge WebKit at r78450: TransformationMatrix multiply methods renamed and meaning changed
Merge WebKit at r78450: FontCustomPlatformData::fontPlatformData() signature changed
Merge WebKit at r78450: IntRect::bottom()/right() renamed
Merge WebKit at r78450: Fix remaining conflicts
Merge WebKit at r78450: Fix conflicts due to new ENABLE_WEB_ARCHIVE guard
Merge WebKit at r78450: Fix conflicts in media controls
Merge WebKit at r78450: Fix Makefiles
Merge WebKit at r78450: Initial merge by git.
Diffstat (limited to 'Source/WebKit2/Platform/CoreIPC')
17 files changed, 801 insertions, 182 deletions
diff --git a/Source/WebKit2/Platform/CoreIPC/ArgumentDecoder.cpp b/Source/WebKit2/Platform/CoreIPC/ArgumentDecoder.cpp index 336f72f..4664806 100644 --- a/Source/WebKit2/Platform/CoreIPC/ArgumentDecoder.cpp +++ b/Source/WebKit2/Platform/CoreIPC/ArgumentDecoder.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2010 Apple Inc. All rights reserved. + * Copyright (C) 2010, 2011 Apple Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -23,6 +23,7 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "ArgumentDecoder.h" #include "DataReference.h" @@ -46,7 +47,13 @@ ArgumentDecoder::~ArgumentDecoder() { ASSERT(m_buffer); fastFree(m_buffer); +#if !PLATFORM(QT) // FIXME: We need to dispose of the mach ports in cases of failure. +#else + Deque<Attachment>::iterator end = m_attachments.end(); + for (Deque<Attachment>::iterator it = m_attachments.begin(); it != end; ++it) + it->dispose(); +#endif } void ArgumentDecoder::initialize(const uint8_t* buffer, size_t bufferSize) @@ -62,13 +69,15 @@ void ArgumentDecoder::initialize(const uint8_t* buffer, size_t bufferSize) static inline uint8_t* roundUpToAlignment(uint8_t* ptr, unsigned alignment) { - return (uint8_t*)(((uintptr_t)ptr + alignment - 1) & ~(uintptr_t)(alignment - 1)); + ASSERT(alignment); + uintptr_t alignmentMask = alignment - 1; + return reinterpret_cast<uint8_t*>((reinterpret_cast<uintptr_t>(ptr) + alignmentMask) & ~alignmentMask); } bool ArgumentDecoder::alignBufferPosition(unsigned alignment, size_t size) { uint8_t* buffer = roundUpToAlignment(m_bufferPos, alignment); - if (buffer + size > m_bufferEnd) { + if (static_cast<size_t>(m_bufferEnd - buffer) < size) { // We've walked off the end of this buffer. markInvalid(); return false; @@ -80,7 +89,7 @@ bool ArgumentDecoder::alignBufferPosition(unsigned alignment, size_t size) bool ArgumentDecoder::bufferIsLargeEnoughToContain(unsigned alignment, size_t size) const { - return roundUpToAlignment(m_bufferPos, alignment) + size <= m_bufferEnd; + return static_cast<size_t>(m_bufferEnd - roundUpToAlignment(m_bufferPos, alignment)) >= size; } bool ArgumentDecoder::decodeBytes(Vector<uint8_t>& buffer) diff --git a/Source/WebKit2/Platform/CoreIPC/ArgumentEncoder.cpp b/Source/WebKit2/Platform/CoreIPC/ArgumentEncoder.cpp index 1340c0a..aa71b0f 100644 --- a/Source/WebKit2/Platform/CoreIPC/ArgumentEncoder.cpp +++ b/Source/WebKit2/Platform/CoreIPC/ArgumentEncoder.cpp @@ -23,6 +23,7 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "ArgumentEncoder.h" #include <algorithm> @@ -49,7 +50,12 @@ ArgumentEncoder::~ArgumentEncoder() { if (m_buffer) fastFree(m_buffer); +#if !PLATFORM(QT) // FIXME: We need to dispose of the attachments in cases of failure. +#else + for (int i = 0; i < m_attachments.size(); ++i) + m_attachments[i].dispose(); +#endif } static inline size_t roundUpToAlignment(size_t value, unsigned alignment) diff --git a/Source/WebKit2/Platform/CoreIPC/Attachment.cpp b/Source/WebKit2/Platform/CoreIPC/Attachment.cpp index c8d35f4..646b64c 100644 --- a/Source/WebKit2/Platform/CoreIPC/Attachment.cpp +++ b/Source/WebKit2/Platform/CoreIPC/Attachment.cpp @@ -23,6 +23,7 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "Attachment.h" #include "ArgumentDecoder.h" diff --git a/Source/WebKit2/Platform/CoreIPC/Attachment.h b/Source/WebKit2/Platform/CoreIPC/Attachment.h index 55a09c9..c057714 100644 --- a/Source/WebKit2/Platform/CoreIPC/Attachment.h +++ b/Source/WebKit2/Platform/CoreIPC/Attachment.h @@ -40,12 +40,16 @@ public: #if PLATFORM(MAC) MachPortType, MachOOLMemoryType +#elif PLATFORM(QT) + MappedMemory #endif }; #if PLATFORM(MAC) Attachment(mach_port_name_t port, mach_msg_type_name_t disposition); Attachment(void* address, mach_msg_size_t size, mach_msg_copy_options_t copyOptions, bool deallocate); +#elif PLATFORM(QT) + Attachment(int fileDescriptor, size_t); #endif Type type() const { return m_type; } @@ -62,6 +66,13 @@ public: mach_msg_size_t size() const { ASSERT(m_type == MachOOLMemoryType); return m_oolMemory.size; } mach_msg_copy_options_t copyOptions() const { ASSERT(m_type == MachOOLMemoryType); return m_oolMemory.copyOptions; } bool deallocate() const { ASSERT(m_type == MachOOLMemoryType); return m_oolMemory.deallocate; } +#elif PLATFORM(QT) + size_t size() const { return m_size; } + + int releaseFileDescriptor() { int temp = m_fileDescriptor; m_fileDescriptor = -1; return temp; } + int fileDescriptor() const { return m_fileDescriptor; } + + void dispose(); #endif void encode(ArgumentEncoder*) const; @@ -83,6 +94,9 @@ private: bool deallocate; } m_oolMemory; }; +#elif PLATFORM(QT) + int m_fileDescriptor; + size_t m_size; #endif }; diff --git a/Source/WebKit2/Platform/CoreIPC/BinarySemaphore.cpp b/Source/WebKit2/Platform/CoreIPC/BinarySemaphore.cpp index c975dff..d4d9e7d 100644 --- a/Source/WebKit2/Platform/CoreIPC/BinarySemaphore.cpp +++ b/Source/WebKit2/Platform/CoreIPC/BinarySemaphore.cpp @@ -23,10 +23,13 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "BinarySemaphore.h" namespace CoreIPC { +#if !PLATFORM(WIN) + BinarySemaphore::BinarySemaphore() : m_isSet(false) { @@ -60,5 +63,6 @@ bool BinarySemaphore::wait(double absoluteTime) return true; } +#endif // !PLATFORM(WIN) } // namespace CoreIPC diff --git a/Source/WebKit2/Platform/CoreIPC/BinarySemaphore.h b/Source/WebKit2/Platform/CoreIPC/BinarySemaphore.h index 8113236..32b5b02 100644 --- a/Source/WebKit2/Platform/CoreIPC/BinarySemaphore.h +++ b/Source/WebKit2/Platform/CoreIPC/BinarySemaphore.h @@ -42,10 +42,14 @@ public: bool wait(double absoluteTime); private: +#if PLATFORM(WIN) + HANDLE m_event; +#else bool m_isSet; Mutex m_mutex; ThreadCondition m_condition; +#endif }; } // namespace CoreIPC diff --git a/Source/WebKit2/Platform/CoreIPC/Connection.cpp b/Source/WebKit2/Platform/CoreIPC/Connection.cpp index da92ce4..5cbd4bc 100644 --- a/Source/WebKit2/Platform/CoreIPC/Connection.cpp +++ b/Source/WebKit2/Platform/CoreIPC/Connection.cpp @@ -23,8 +23,10 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "Connection.h" +#include "BinarySemaphore.h" #include "CoreIPCMessageKinds.h" #include "RunLoop.h" #include "WorkItem.h" @@ -34,6 +36,156 @@ using namespace std; namespace CoreIPC { +class Connection::SyncMessageState : public RefCounted<Connection::SyncMessageState> { +public: + static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*); + ~SyncMessageState(); + + void beginWaitForSyncReply(); + void endWaitForSyncReply(); + + void wakeUpClientRunLoop() + { + m_waitForSyncReplySemaphore.signal(); + } + + bool wait(double absoluteTime) + { + return m_waitForSyncReplySemaphore.wait(absoluteTime); + } + + // Returns true if this message will be handled on a client thread that is currently + // waiting for a reply to a synchronous message. + bool processIncomingMessage(Connection*, IncomingMessage&); + + void dispatchMessages(); + +private: + explicit SyncMessageState(RunLoop*); + + typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap; + static SyncMessageStateMap& syncMessageStateMap() + { + DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ()); + return syncMessageStateMap; + } + + static Mutex& syncMessageStateMapMutex() + { + DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ()); + return syncMessageStateMapMutex; + } + + RunLoop* m_runLoop; + BinarySemaphore m_waitForSyncReplySemaphore; + + // Protects m_waitForSyncReplyCount and m_messagesToDispatchWhileWaitingForSyncReply. + Mutex m_mutex; + + unsigned m_waitForSyncReplyCount; + + struct ConnectionAndIncomingMessage { + Connection* connection; + IncomingMessage incomingMessage; + }; + Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply; +}; + +PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop) +{ + MutexLocker locker(syncMessageStateMapMutex()); + pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0); + + if (!result.second) { + ASSERT(result.first->second); + return result.first->second; + } + + RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop)); + result.first->second = syncMessageState.get(); + + return syncMessageState.release(); +} + +Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop) + : m_runLoop(runLoop) + , m_waitForSyncReplyCount(0) +{ +} + +Connection::SyncMessageState::~SyncMessageState() +{ + MutexLocker locker(syncMessageStateMapMutex()); + + ASSERT(syncMessageStateMap().contains(m_runLoop)); + syncMessageStateMap().remove(m_runLoop); +} + +void Connection::SyncMessageState::beginWaitForSyncReply() +{ + ASSERT(RunLoop::current() == m_runLoop); + + MutexLocker locker(m_mutex); + m_waitForSyncReplyCount++; +} + +void Connection::SyncMessageState::endWaitForSyncReply() +{ + ASSERT(RunLoop::current() == m_runLoop); + + MutexLocker locker(m_mutex); + ASSERT(m_waitForSyncReplyCount); + --m_waitForSyncReplyCount; + + if (m_waitForSyncReplyCount) + return; + + // Dispatch any remaining incoming sync messages. + for (size_t i = 0; i < m_messagesToDispatchWhileWaitingForSyncReply.size(); ++i) { + ConnectionAndIncomingMessage& connectionAndIncomingMessage = m_messagesToDispatchWhileWaitingForSyncReply[i]; + connectionAndIncomingMessage.connection->enqueueIncomingMessage(connectionAndIncomingMessage.incomingMessage); + } + + m_messagesToDispatchWhileWaitingForSyncReply.clear(); +} + +bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage) +{ + MessageID messageID = incomingMessage.messageID(); + if (!messageID.isSync() && !messageID.shouldDispatchMessageWhenWaitingForSyncReply()) + return false; + + MutexLocker locker(m_mutex); + if (!m_waitForSyncReplyCount) + return false; + + ConnectionAndIncomingMessage connectionAndIncomingMessage; + connectionAndIncomingMessage.connection = connection; + connectionAndIncomingMessage.incomingMessage = incomingMessage; + + m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage); + wakeUpClientRunLoop(); + + return true; +} + +void Connection::SyncMessageState::dispatchMessages() +{ + ASSERT(m_runLoop == RunLoop::current()); + + Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply; + + { + MutexLocker locker(m_mutex); + m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply); + } + + for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) { + ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i]; + connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage); + } +} + PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop) { return adoptRef(new Connection(identifier, true, client, clientRunLoop)); @@ -48,11 +200,13 @@ Connection::Connection(Identifier identifier, bool isServer, Client* client, Run : m_client(client) , m_isServer(isServer) , m_syncRequestID(0) + , m_didCloseOnConnectionWorkQueueCallback(0) , m_isConnected(false) , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue") , m_clientRunLoop(clientRunLoop) , m_inDispatchMessageCount(0) , m_didReceiveInvalidMessage(false) + , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop)) , m_shouldWaitForSyncReplies(true) { ASSERT(m_client); @@ -67,6 +221,13 @@ Connection::~Connection() m_connectionQueue.invalidate(); } +void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback) +{ + ASSERT(!m_isConnected); + + m_didCloseOnConnectionWorkQueueCallback = callback; +} + void Connection::invalidate() { if (!isValid()) { @@ -99,11 +260,14 @@ PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_ return argumentEncoder.release(); } -bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments) +bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags) { if (!isValid()) return false; + if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply) + messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply); + MutexLocker locker(m_outgoingMessagesLock); m_outgoingMessages.append(OutgoingMessage(messageID, arguments)); @@ -179,21 +343,27 @@ PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uin // We only allow sending sync messages from the client run loop. ASSERT(RunLoop::current() == m_clientRunLoop); - if (!isValid()) + if (!isValid()) { + m_client->didFailToSendSyncMessage(this); return 0; + } // Push the pending sync reply information on our stack. { MutexLocker locker(m_syncReplyStateMutex); - if (!m_shouldWaitForSyncReplies) + if (!m_shouldWaitForSyncReplies) { + m_client->didFailToSendSyncMessage(this); return 0; + } m_pendingSyncReplies.append(PendingSyncReply(syncRequestID)); } // First send the message. sendMessage(messageID, encoder); - + + m_syncMessageState->beginWaitForSyncReply(); + // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so // keep an extra reference to the connection here in case it's invalidated. RefPtr<Connection> protect(this); @@ -204,22 +374,13 @@ PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uin MutexLocker locker(m_syncReplyStateMutex); ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID); m_pendingSyncReplies.removeLast(); - - if (m_pendingSyncReplies.isEmpty()) { - // This was the bottom-most sendSyncMessage call in the stack. If we have any pending incoming - // sync messages, they need to be dispatched. - if (!m_syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) { - // Add the messages. - MutexLocker locker(m_incomingMessagesLock); - m_incomingMessages.append(m_syncMessagesReceivedWhileWaitingForSyncReply); - m_syncMessagesReceivedWhileWaitingForSyncReply.clear(); - - // Schedule for the messages to be sent. - m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages)); - } - } } - + + m_syncMessageState->endWaitForSyncReply(); + + if (!reply) + m_client->didFailToSendSyncMessage(this); + return reply.release(); } @@ -229,27 +390,12 @@ PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, bool timedOut = false; while (!timedOut) { + // First, check if we have any messages that we need to process. + m_syncMessageState->dispatchMessages(); + { MutexLocker locker(m_syncReplyStateMutex); - // First, check if we have any incoming sync messages that we need to process. - Vector<IncomingMessage> syncMessagesReceivedWhileWaitingForSyncReply; - m_syncMessagesReceivedWhileWaitingForSyncReply.swap(syncMessagesReceivedWhileWaitingForSyncReply); - - if (!syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) { - // Make sure to unlock the mutex here because we're calling out to client code which could in turn send - // another sync message and we don't want that to deadlock. - m_syncReplyStateMutex.unlock(); - - for (size_t i = 0; i < syncMessagesReceivedWhileWaitingForSyncReply.size(); ++i) { - IncomingMessage& message = syncMessagesReceivedWhileWaitingForSyncReply[i]; - OwnPtr<ArgumentDecoder> arguments = message.releaseArguments(); - - dispatchSyncMessage(message.messageID(), arguments.get()); - } - m_syncReplyStateMutex.lock(); - } - // Second, check if there is a sync reply at the top of the stack. ASSERT(!m_pendingSyncReplies.isEmpty()); @@ -262,7 +408,7 @@ PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, } // We didn't find a sync reply yet, keep waiting. - timedOut = !m_waitForSyncReplySemaphore.wait(absoluteTime); + timedOut = !m_syncMessageState->wait(absoluteTime); } // We timed out. @@ -281,42 +427,33 @@ void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<Argument pendingSyncReply.replyDecoder = arguments.leakPtr(); pendingSyncReply.didReceiveReply = true; - - m_waitForSyncReplySemaphore.signal(); + m_syncMessageState->wakeUpClientRunLoop(); return; } - // Check if this is a sync message. If it is, and we're waiting for a sync reply this message - // needs to be dispatched. If we don't we'll end up with a deadlock where both sync message senders are - // stuck waiting for a reply. - if (messageID.isSync()) { - MutexLocker locker(m_syncReplyStateMutex); - if (!m_pendingSyncReplies.isEmpty()) { - m_syncMessagesReceivedWhileWaitingForSyncReply.append(IncomingMessage(messageID, arguments)); + IncomingMessage incomingMessage(messageID, arguments); + + // Check if this is a sync message or if it's a message that should be dispatched even when waiting for + // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched. + // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply. + if (m_syncMessageState->processIncomingMessage(this, incomingMessage)) + return; - // The message has been added, now wake up the client thread. - m_waitForSyncReplySemaphore.signal(); - return; - } - } - // Check if we're waiting for this message. { MutexLocker locker(m_waitForMessageMutex); - HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), arguments->destinationID())); + HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID())); if (it != m_waitForMessageMap.end()) { - it->second = arguments.leakPtr(); + it->second = incomingMessage.releaseArguments().leakPtr(); + ASSERT(it->second); m_waitForMessageCondition.signal(); return; } } - MutexLocker locker(m_incomingMessagesLock); - m_incomingMessages.append(IncomingMessage(messageID, arguments)); - - m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages)); + enqueueIncomingMessage(incomingMessage); } void Connection::connectionDidClose() @@ -331,10 +468,11 @@ void Connection::connectionDidClose() m_shouldWaitForSyncReplies = false; if (!m_pendingSyncReplies.isEmpty()) - m_waitForSyncReplySemaphore.signal(); + m_syncMessageState->wakeUpClientRunLoop(); } - m_client->didCloseOnConnectionWorkQueue(&m_connectionQueue, this); + if (m_didCloseOnConnectionWorkQueueCallback) + m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this); m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose)); } @@ -412,42 +550,53 @@ void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* argum sendSyncReply(replyEncoder); } -void Connection::dispatchMessages() +void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage) { - Vector<IncomingMessage> incomingMessages; - - { - MutexLocker locker(m_incomingMessagesLock); - m_incomingMessages.swap(incomingMessages); - } + MutexLocker locker(m_incomingMessagesLock); + m_incomingMessages.append(incomingMessage); - // Dispatch messages. - for (size_t i = 0; i < incomingMessages.size(); ++i) { - // If someone calls invalidate while we're invalidating messages, we should stop. - if (!m_client) - return; - - IncomingMessage& message = incomingMessages[i]; - OwnPtr<ArgumentDecoder> arguments = message.releaseArguments(); + m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages)); +} + +void Connection::dispatchMessage(IncomingMessage& message) +{ + OwnPtr<ArgumentDecoder> arguments = message.releaseArguments(); + + // If there's no client, return. We do this after calling releaseArguments so that + // the ArgumentDecoder message will be freed. + if (!m_client) + return; - m_inDispatchMessageCount++; + m_inDispatchMessageCount++; - bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage; - m_didReceiveInvalidMessage = false; + bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage; + m_didReceiveInvalidMessage = false; - if (message.messageID().isSync()) - dispatchSyncMessage(message.messageID(), arguments.get()); - else - m_client->didReceiveMessage(this, message.messageID(), arguments.get()); + if (message.messageID().isSync()) + dispatchSyncMessage(message.messageID(), arguments.get()); + else + m_client->didReceiveMessage(this, message.messageID(), arguments.get()); - m_didReceiveInvalidMessage |= arguments->isInvalid(); - m_inDispatchMessageCount--; + m_didReceiveInvalidMessage |= arguments->isInvalid(); + m_inDispatchMessageCount--; - if (m_didReceiveInvalidMessage) - m_client->didReceiveInvalidMessage(this, message.messageID()); + if (m_didReceiveInvalidMessage && m_client) + m_client->didReceiveInvalidMessage(this, message.messageID()); + + m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage; +} - m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage; +void Connection::dispatchMessages() +{ + Vector<IncomingMessage> incomingMessages; + + { + MutexLocker locker(m_incomingMessagesLock); + m_incomingMessages.swap(incomingMessages); } + + for (size_t i = 0; i < incomingMessages.size(); ++i) + dispatchMessage(incomingMessages[i]); } } // namespace CoreIPC diff --git a/Source/WebKit2/Platform/CoreIPC/Connection.h b/Source/WebKit2/Platform/CoreIPC/Connection.h index 1b009cf..eaa2ab9 100644 --- a/Source/WebKit2/Platform/CoreIPC/Connection.h +++ b/Source/WebKit2/Platform/CoreIPC/Connection.h @@ -31,7 +31,6 @@ #include "ArgumentDecoder.h" #include "ArgumentEncoder.h" #include "Arguments.h" -#include "BinarySemaphore.h" #include "MessageID.h" #include "WorkQueue.h" #include <wtf/HashMap.h> @@ -44,9 +43,8 @@ #elif PLATFORM(WIN) #include <string> #elif PLATFORM(QT) -#include <QString> -class QLocalServer; -class QLocalSocket; +class QSocketNotifier; +#include "PlatformProcessIdentifier.h" #endif class RunLoop; @@ -60,6 +58,12 @@ enum SyncReplyMode { ManualReply }; +enum MessageSendFlags { + // Whether this message should be dispatched when waiting for a sync reply. + // This is the default for synchronous messages. + DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0, +}; + #define MESSAGE_CHECK_BASE(assertion, connection) do \ if (!(assertion)) { \ ASSERT(assertion); \ @@ -86,10 +90,7 @@ public: public: virtual void didClose(Connection*) = 0; virtual void didReceiveInvalidMessage(Connection*, MessageID) = 0; - - // Called on the connection work queue when the connection is closed, before - // didCall is called on the client thread. - virtual void didCloseOnConnectionWorkQueue(WorkQueue*, Connection*) { } + virtual void didFailToSendSyncMessage(Connection*) { } }; #if PLATFORM(MAC) @@ -98,7 +99,7 @@ public: typedef HANDLE Identifier; static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier); #elif PLATFORM(QT) - typedef const QString Identifier; + typedef int Identifier; #elif PLATFORM(GTK) typedef int Identifier; #endif @@ -109,27 +110,37 @@ public: #if PLATFORM(MAC) void setShouldCloseConnectionOnMachExceptions(); +#elif PLATFORM(QT) + void setShouldCloseConnectionOnProcessTermination(WebKit::PlatformProcessIdentifier); #endif + // The set callback will be called on the connection work queue when the connection is closed, + // before didCall is called on the client thread. Must be called before the connection is opened. + // In the future we might want a more generic way to handle sync or async messages directly + // on the work queue, for example if we want to handle them on some other thread we could avoid + // handling the message on the client thread first. + typedef void (*DidCloseOnConnectionWorkQueueCallback)(WorkQueue&, Connection*); + void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback); + bool open(); void invalidate(); void markCurrentlyDispatchedMessageAsInvalid(); static const unsigned long long NoTimeout = 10000000000ULL; - template<typename T> bool send(const T& message, uint64_t destinationID); + template<typename T> bool send(const T& message, uint64_t destinationID, unsigned messageSendFlags = 0); template<typename T> bool sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout = NoTimeout); template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, double timeout); PassOwnPtr<ArgumentEncoder> createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID); - bool sendMessage(MessageID, PassOwnPtr<ArgumentEncoder>); + bool sendMessage(MessageID, PassOwnPtr<ArgumentEncoder>, unsigned messageSendFlags = 0); bool sendSyncReply(PassOwnPtr<ArgumentEncoder>); - // FIXME: These variants of senc, sendSync and waitFor are all deprecated. + // FIXME: These variants of send, sendSync and waitFor are all deprecated. // All clients should move to the overloads that take a message type. - template<typename E, typename T> bool send(E messageID, uint64_t destinationID, const T& arguments); - template<typename E, typename T, typename U> bool sendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout = NoTimeout); - template<typename E> PassOwnPtr<ArgumentDecoder> waitFor(E messageID, uint64_t destinationID, double timeout); + template<typename E, typename T> bool deprecatedSend(E messageID, uint64_t destinationID, const T& arguments); + template<typename E, typename T, typename U> bool deprecatedSendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout = NoTimeout); + template<typename E> PassOwnPtr<ArgumentDecoder> deprecatedWaitFor(E messageID, uint64_t destinationID, double timeout); private: template<typename T> class Message { @@ -146,6 +157,8 @@ private: } MessageID messageID() const { return m_messageID; } + uint64_t destinationID() const { return m_arguments->destinationID(); } + T* arguments() const { return m_arguments; } PassOwnPtr<T> releaseArguments() @@ -184,15 +197,23 @@ private: bool sendOutgoingMessage(MessageID, PassOwnPtr<ArgumentEncoder>); void connectionDidClose(); + typedef Message<ArgumentDecoder> IncomingMessage; + // Called on the listener thread. void dispatchConnectionDidClose(); + void dispatchMessage(IncomingMessage&); void dispatchMessages(); void dispatchSyncMessage(MessageID, ArgumentDecoder*); - + + // Can be called on any thread. + void enqueueIncomingMessage(IncomingMessage&); + Client* m_client; bool m_isServer; uint64_t m_syncRequestID; + DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback; + bool m_isConnected; WorkQueue m_connectionQueue; RunLoop* m_clientRunLoop; @@ -201,8 +222,6 @@ private: bool m_didReceiveInvalidMessage; // Incoming messages. - typedef Message<ArgumentDecoder> IncomingMessage; - Mutex m_incomingMessagesLock; Vector<IncomingMessage> m_incomingMessages; @@ -249,12 +268,13 @@ private: } }; - BinarySemaphore m_waitForSyncReplySemaphore; + class SyncMessageState; + friend class SyncMessageState; + RefPtr<SyncMessageState> m_syncMessageState; Mutex m_syncReplyStateMutex; bool m_shouldWaitForSyncReplies; Vector<PendingSyncReply> m_pendingSyncReplies; - Vector<IncomingMessage> m_syncMessagesReceivedWhileWaitingForSyncReply; #if PLATFORM(MAC) // Called on the connection queue. @@ -285,8 +305,8 @@ private: Vector<uint8_t> m_readBuffer; size_t m_currentMessageSize; - QLocalSocket* m_socket; - QString m_serverName; + QSocketNotifier* m_socketNotifier; + int m_socketDescriptor; #elif PLATFORM(GTK) void readEventHandler(); void processCompletedMessage(); @@ -299,12 +319,12 @@ private: #endif }; -template<typename T> bool Connection::send(const T& message, uint64_t destinationID) +template<typename T> bool Connection::send(const T& message, uint64_t destinationID, unsigned messageSendFlags) { OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID); argumentEncoder->encode(message); - return sendMessage(MessageID(T::messageID), argumentEncoder.release()); + return sendMessage(MessageID(T::messageID), argumentEncoder.release(), messageSendFlags); } template<typename T> bool Connection::sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout) @@ -338,7 +358,7 @@ template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t des // These three member functions are all deprecated. template<typename E, typename T, typename U> -inline bool Connection::sendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout) +inline bool Connection::deprecatedSendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout) { uint64_t syncRequestID = 0; OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID); @@ -356,7 +376,7 @@ inline bool Connection::sendSync(E messageID, uint64_t destinationID, const T& a } template<typename E, typename T> -bool Connection::send(E messageID, uint64_t destinationID, const T& arguments) +bool Connection::deprecatedSend(E messageID, uint64_t destinationID, const T& arguments) { OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID); argumentEncoder->encode(arguments); @@ -364,7 +384,7 @@ bool Connection::send(E messageID, uint64_t destinationID, const T& arguments) return sendMessage(MessageID(messageID), argumentEncoder.release()); } -template<typename E> inline PassOwnPtr<ArgumentDecoder> Connection::waitFor(E messageID, uint64_t destinationID, double timeout) +template<typename E> inline PassOwnPtr<ArgumentDecoder> Connection::deprecatedWaitFor(E messageID, uint64_t destinationID, double timeout) { return waitForMessage(MessageID(messageID), destinationID, timeout); } diff --git a/Source/WebKit2/Platform/CoreIPC/DataReference.cpp b/Source/WebKit2/Platform/CoreIPC/DataReference.cpp index 308fd6e..f186a35 100644 --- a/Source/WebKit2/Platform/CoreIPC/DataReference.cpp +++ b/Source/WebKit2/Platform/CoreIPC/DataReference.cpp @@ -23,6 +23,7 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "DataReference.h" #include "ArgumentDecoder.h" diff --git a/Source/WebKit2/Platform/CoreIPC/HandleMessage.h b/Source/WebKit2/Platform/CoreIPC/HandleMessage.h index 534e825..abbe089 100644 --- a/Source/WebKit2/Platform/CoreIPC/HandleMessage.h +++ b/Source/WebKit2/Platform/CoreIPC/HandleMessage.h @@ -159,6 +159,11 @@ void callMemberFunction(const Arguments5<P1, P2, P3, P4, P5>& args, Arguments2<R (object->*function)(args.argument1, args.argument2, args.argument3, args.argument4, args.argument5, replyArgs.argument1, replyArgs.argument2); } +template<typename C, typename MF, typename P1, typename P2, typename P3, typename P4, typename P5, typename P6, typename R1, typename R2> +void callMemberFunction(const Arguments6<P1, P2, P3, P4, P5, P6>& args, Arguments2<R1, R2>& replyArgs, C* object, MF function) +{ + (object->*function)(args.argument1, args.argument2, args.argument3, args.argument4, args.argument5, args.argument6, replyArgs.argument1, replyArgs.argument2); +} template<typename C, typename MF, typename P1, typename P2, typename P3, typename P4, typename R1, typename R2, typename R3> void callMemberFunction(const Arguments4<P1, P2, P3, P4>& args, Arguments3<R1, R2, R3>& replyArgs, C* object, MF function) @@ -210,6 +215,25 @@ void callMemberFunction(const Arguments6<P1, P2, P3, P4, P5, P6>& args, Argument (object->*function)(args.argument1, args.argument2, args.argument3, args.argument4, args.argument5, args.argument6, argumentDecoder); } +template<typename C, typename MF, typename P1, typename P2, typename P3, typename P4, typename P5, typename P6, typename P7> +void callMemberFunction(const Arguments7<P1, P2, P3, P4, P5, P6, P7>& args, ArgumentDecoder* argumentDecoder, C* object, MF function) +{ + (object->*function)(args.argument1, args.argument2, args.argument3, args.argument4, args.argument5, args.argument6, args.argument7, argumentDecoder); +} + +// Variadic dispatch functions with non-variadic reply arguments. + +template<typename C, typename MF, typename P1, typename P2, typename P3, typename P4, typename R1, typename R2, typename R3> +void callMemberFunction(const Arguments4<P1, P2, P3, P4>& args, ArgumentDecoder* argumentDecoder, Arguments3<R1, R2, R3>& replyArgs, C* object, MF function) +{ + (object->*function)(args.argument1, args.argument2, args.argument3, args.argument4, argumentDecoder, replyArgs.argument1, replyArgs.argument2, replyArgs.argument3); +} + +template<typename C, typename MF, typename P1, typename P2, typename P3, typename P4, typename P5, typename P6, typename R1, typename R2> +void callMemberFunction(const Arguments6<P1, P2, P3, P4, P5, P6>& args, ArgumentDecoder* argumentDecoder, Arguments2<R1, R2>& replyArgs, C* object, MF function) +{ + (object->*function)(args.argument1, args.argument2, args.argument3, args.argument4, args.argument5, args.argument6, argumentDecoder, replyArgs.argument1, replyArgs.argument2); +} // Main dispatch functions @@ -243,6 +267,19 @@ void handleMessageVariadic(ArgumentDecoder* argumentDecoder, C* object, MF funct callMemberFunction(arguments, argumentDecoder, object, function); } + +template<typename T, typename C, typename MF> +void handleMessageVariadic(ArgumentDecoder* argumentDecoder, ArgumentEncoder* replyEncoder, C* object, MF function) +{ + typename T::DecodeType::ValueType arguments; + if (!argumentDecoder->decode(arguments)) + return; + + typename T::Reply::ValueType replyArguments; + callMemberFunction(arguments, argumentDecoder, replyArguments, object, function); + replyEncoder->encode(replyArguments); +} + } // namespace CoreIPC #endif // HandleMessage_h diff --git a/Source/WebKit2/Platform/CoreIPC/MessageID.h b/Source/WebKit2/Platform/CoreIPC/MessageID.h index 724302c..bd8180a 100644 --- a/Source/WebKit2/Platform/CoreIPC/MessageID.h +++ b/Source/WebKit2/Platform/CoreIPC/MessageID.h @@ -95,6 +95,7 @@ class MessageID { public: enum Flags { SyncMessage = 1 << 0, + DispatchMessageWhenWaitingForSyncReply = 1 << 1, }; MessageID() @@ -108,6 +109,14 @@ public: { } + MessageID messageIDWithAddedFlags(unsigned char flags) + { + MessageID messageID; + + messageID.m_messageID = stripMostSignificantBit(m_messageID | (flags << 24)); + return messageID; + } + template <typename EnumType> EnumType get() const { @@ -137,6 +146,7 @@ public: unsigned toInt() const { return m_messageID; } + bool shouldDispatchMessageWhenWaitingForSyncReply() const { return getFlags() & DispatchMessageWhenWaitingForSyncReply; } bool isSync() const { return getFlags() & SyncMessage; } private: diff --git a/Source/WebKit2/Platform/CoreIPC/gtk/ConnectionGtk.cpp b/Source/WebKit2/Platform/CoreIPC/gtk/ConnectionGtk.cpp index d561110..4b140ee 100644 --- a/Source/WebKit2/Platform/CoreIPC/gtk/ConnectionGtk.cpp +++ b/Source/WebKit2/Platform/CoreIPC/gtk/ConnectionGtk.cpp @@ -24,6 +24,7 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "Connection.h" #include "ArgumentEncoder.h" @@ -98,7 +99,6 @@ void Connection::platformInitialize(Identifier identifier) m_pendingBytes = 0; m_readBuffer.resize(initialMessageBufferSize); m_socket = identifier; - m_isConnected = true; } void Connection::platformInvalidate() @@ -171,6 +171,8 @@ bool Connection::open() int flags = fcntl(m_socket, F_GETFL, 0); fcntl(m_socket, F_SETFL, flags | O_NONBLOCK); + m_isConnected = true; + // Register callbacks for connection termination and input data on the WorkQueue. m_connectionQueue.registerEventSourceHandler(m_socket, (G_IO_HUP | G_IO_ERR), WorkItem::create(this, &Connection::connectionDidClose)); m_connectionQueue.registerEventSourceHandler(m_socket, G_IO_IN, WorkItem::create(this, &Connection::readEventHandler)); diff --git a/Source/WebKit2/Platform/CoreIPC/mac/ConnectionMac.cpp b/Source/WebKit2/Platform/CoreIPC/mac/ConnectionMac.cpp index 5e7bbbc..5c4b5d1 100644 --- a/Source/WebKit2/Platform/CoreIPC/mac/ConnectionMac.cpp +++ b/Source/WebKit2/Platform/CoreIPC/mac/ConnectionMac.cpp @@ -23,6 +23,7 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "Connection.h" #include "CoreIPCMessageKinds.h" @@ -93,7 +94,7 @@ bool Connection::open() m_isConnected = true; // Send the initialize message, which contains a send right for the server to use. - send(CoreIPCMessage::InitializeConnection, 0, MachPort(m_receivePort, MACH_MSG_TYPE_MAKE_SEND)); + deprecatedSend(CoreIPCMessage::InitializeConnection, 0, MachPort(m_receivePort, MACH_MSG_TYPE_MAKE_SEND)); // Set the dead name handler for our send port. initializeDeadNameSource(); @@ -109,7 +110,7 @@ bool Connection::open() if (m_exceptionPort) { m_connectionQueue.registerMachPortEventHandler(m_exceptionPort, WorkQueue::MachPortDataAvailable, WorkItem::create(this, &Connection::exceptionSourceEventHandler)); - send(CoreIPCMessage::SetExceptionPort, 0, MachPort(m_exceptionPort, MACH_MSG_TYPE_MAKE_SEND)); + deprecatedSend(CoreIPCMessage::SetExceptionPort, 0, MachPort(m_exceptionPort, MACH_MSG_TYPE_MAKE_SEND)); } return true; diff --git a/Source/WebKit2/Platform/CoreIPC/qt/AttachmentQt.cpp b/Source/WebKit2/Platform/CoreIPC/qt/AttachmentQt.cpp new file mode 100644 index 0000000..4c0ebc0 --- /dev/null +++ b/Source/WebKit2/Platform/CoreIPC/qt/AttachmentQt.cpp @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2010 Apple Inc. All rights reserved. + * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies) + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "Attachment.h" + +#if PLATFORM(QT) +#include <unistd.h> +#include <errno.h> +#endif + + +namespace CoreIPC { + +Attachment::Attachment(int fileDescriptor, size_t size) + : m_type(MappedMemory) + , m_fileDescriptor(fileDescriptor) + , m_size(size) +{ + ASSERT(m_fileDescriptor); + ASSERT(m_size); +} + +void Attachment::dispose() +{ + if (m_fileDescriptor != -1) + while (close(m_fileDescriptor) == -1 && (errno == EINTR)) { } +} + +} // namespace CoreIPC diff --git a/Source/WebKit2/Platform/CoreIPC/qt/ConnectionQt.cpp b/Source/WebKit2/Platform/CoreIPC/qt/ConnectionQt.cpp index c0736b8..225d7dc 100644 --- a/Source/WebKit2/Platform/CoreIPC/qt/ConnectionQt.cpp +++ b/Source/WebKit2/Platform/CoreIPC/qt/ConnectionQt.cpp @@ -24,119 +24,346 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "Connection.h" #include "ArgumentEncoder.h" #include "ProcessLauncher.h" #include "WorkItem.h" +#include "SharedMemory.h" +#include "WebProcessProxy.h" #include <QApplication> -#include <QLocalServer> -#include <QLocalSocket> +#include <QSocketNotifier> +#include <sys/socket.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <wtf/Assertions.h> using namespace std; namespace CoreIPC { -// This is what other ports use... static const size_t messageMaxSize = 4096; +static const size_t attachmentMaxAmount = 255; + +enum { + MessageBodyIsOOL = 1 << 31 +}; + +class MessageInfo { +public: + MessageInfo() { } + + MessageInfo(MessageID messageID, size_t bodySize, size_t initialAttachmentCount) + : m_messageID(messageID.toInt()) + , m_bodySize(bodySize) + , m_attachmentCount(initialAttachmentCount) + { + ASSERT(!(m_messageID & MessageBodyIsOOL)); + } + + void setMessageBodyOOL() + { + ASSERT(!isMessageBodyOOL()); + + m_messageID |= MessageBodyIsOOL; + m_attachmentCount++; + } + + bool isMessageBodyOOL() const { return m_messageID & MessageBodyIsOOL; } + + size_t bodySize() const { return m_bodySize; } + + MessageID messageID() const { return MessageID::fromInt(m_messageID & ~MessageBodyIsOOL); } + + size_t attachmentCount() const { return m_attachmentCount; } + +private: + uint32_t m_messageID; + size_t m_bodySize; + size_t m_attachmentCount; +}; void Connection::platformInitialize(Identifier identifier) { - m_serverName = identifier; - m_socket = 0; + m_socketDescriptor = identifier; + m_socketNotifier = 0; m_readBuffer.resize(messageMaxSize); m_currentMessageSize = 0; } void Connection::platformInvalidate() { - m_socket->disconnect(); - if (!m_isServer) - m_socket->deleteLater(); - m_socket = 0; + if (m_socketDescriptor != -1) + while (close(m_socketDescriptor) == -1 && errno == EINTR) { } + + if (!m_isConnected) + return; + + delete m_socketNotifier; + m_socketNotifier = 0; + m_socketDescriptor = -1; + m_isConnected = false; } +class SocketNotifierResourceGuard { +public: + SocketNotifierResourceGuard(QSocketNotifier* socketNotifier) + : m_socketNotifier(socketNotifier) + { + m_socketNotifier->setEnabled(false); + } + + ~SocketNotifierResourceGuard() + { + m_socketNotifier->setEnabled(true); + } + +private: + QSocketNotifier* const m_socketNotifier; +}; + +template<class T, class iterator> +class AttachmentResourceGuard { +public: + AttachmentResourceGuard(T& attachments) + : m_attachments(attachments) + { + } + ~AttachmentResourceGuard() + { + iterator end = m_attachments.end(); + for (iterator i = m_attachments.begin(); i != end; ++i) + i->dispose(); + } +private: + T& m_attachments; +}; + void Connection::readyReadHandler() { - while (m_socket->bytesAvailable()) { - if (!m_currentMessageSize) { - size_t numberOfBytesRead = m_socket->read(reinterpret_cast<char*>(m_readBuffer.data()), sizeof(size_t)); - ASSERT_UNUSED(numberOfBytesRead, numberOfBytesRead); - m_currentMessageSize = *reinterpret_cast<size_t*>(m_readBuffer.data()); - } + Deque<Attachment> attachments; + SocketNotifierResourceGuard socketNotifierEnabler(m_socketNotifier); + AttachmentResourceGuard<Deque<Attachment>, Deque<Attachment>::iterator> attachementDisposer(attachments); + + char attachmentDescriptorBuffer[CMSG_SPACE(sizeof(int) * (attachmentMaxAmount))]; + struct msghdr message; + memset(&message, 0, sizeof(message)); + + struct iovec iov[1]; + memset(&iov, 0, sizeof(iov)); - if (m_socket->bytesAvailable() < m_currentMessageSize) + message.msg_control = attachmentDescriptorBuffer; + message.msg_controllen = CMSG_SPACE(sizeof(int) * (attachmentMaxAmount)); + + iov[0].iov_base = m_readBuffer.data(); + iov[0].iov_len = m_readBuffer.size(); + + message.msg_iov = iov; + message.msg_iovlen = 1; + + + int messageLength = 0; + while ((messageLength = recvmsg(m_socketDescriptor, &message, 0)) == -1) { + if (errno != EINTR) return; + } + + struct cmsghdr* controlMessage = CMSG_FIRSTHDR(&message); + + MessageInfo messageInfo; + unsigned char* messageData = m_readBuffer.data(); + + memcpy(&messageInfo, messageData, sizeof(messageInfo)); + ASSERT(messageLength == sizeof(messageInfo) + messageInfo.attachmentCount() * sizeof(size_t) + (messageInfo.isMessageBodyOOL() ? 0 : messageInfo.bodySize())); + + messageData += sizeof(messageInfo); + + RefPtr<WebKit::SharedMemory> oolMessageBody; + + if (messageInfo.attachmentCount()) { + if (controlMessage && controlMessage->cmsg_level == SOL_SOCKET && controlMessage->cmsg_type == SCM_RIGHTS) { + size_t attachmentSizes[messageInfo.attachmentCount()]; + memcpy(attachmentSizes, messageData, sizeof(attachmentSizes)); + + messageData += sizeof(attachmentSizes); + + int fileDescriptors[messageInfo.attachmentCount()]; + memcpy(fileDescriptors, CMSG_DATA(controlMessage), sizeof(fileDescriptors)); + + int attachmentCount = messageInfo.attachmentCount(); - if (m_readBuffer.size() < m_currentMessageSize) - m_readBuffer.grow(m_currentMessageSize); + if (messageInfo.isMessageBodyOOL()) + attachmentCount--; - size_t numberOfBytesRead = m_socket->read(reinterpret_cast<char*>(m_readBuffer.data()), m_currentMessageSize); - ASSERT_UNUSED(numberOfBytesRead, numberOfBytesRead); + for (int i = 0; i < attachmentCount; ++i) { + while (fcntl(fileDescriptors[i], F_SETFL, FD_CLOEXEC) == -1) { + if (errno != EINTR) { + ASSERT_NOT_REACHED(); + return; + } + } + } - // The messageID is encoded at the end of the buffer. - size_t realBufferSize = m_currentMessageSize - sizeof(uint32_t); - uint32_t messageID = *reinterpret_cast<uint32_t*>(m_readBuffer.data() + realBufferSize); + for (int i = 0; i < attachmentCount; ++i) + attachments.append(Attachment(fileDescriptors[i], attachmentSizes[i])); - processIncomingMessage(MessageID::fromInt(messageID), adoptPtr(new ArgumentDecoder(m_readBuffer.data(), realBufferSize))); + if (messageInfo.isMessageBodyOOL()) { + ASSERT(messageInfo.bodySize()); - m_currentMessageSize = 0; + WebKit::SharedMemory::Handle handle; + handle.adoptFromAttachment(fileDescriptors[attachmentCount], attachmentSizes[attachmentCount]); + if (handle.isNull()) { + ASSERT_NOT_REACHED(); + return; + } + + oolMessageBody = WebKit::SharedMemory::create(handle, WebKit::SharedMemory::ReadOnly); + if (!oolMessageBody) { + ASSERT_NOT_REACHED(); + return; + } + } + + controlMessage = CMSG_NXTHDR(&message, controlMessage); + } else { + ASSERT_NOT_REACHED(); + return; + } } + + ASSERT(attachments.size() == messageInfo.isMessageBodyOOL() ? messageInfo.attachmentCount() - 1 : messageInfo.attachmentCount()); + + unsigned char* messageBody = messageData; + + if (messageInfo.isMessageBodyOOL()) + messageBody = reinterpret_cast<unsigned char*>(oolMessageBody->data()); + + ArgumentDecoder* argumentDecoder; + if (attachments.isEmpty()) + argumentDecoder = new ArgumentDecoder(messageBody, messageInfo.bodySize()); + else + argumentDecoder = new ArgumentDecoder(messageBody, messageInfo.bodySize(), attachments); + + processIncomingMessage(messageInfo.messageID(), adoptPtr(argumentDecoder)); + + ASSERT(!controlMessage); } bool Connection::open() { - ASSERT(!m_socket); - - if (m_isServer) { - m_socket = WebKit::ProcessLauncher::takePendingConnection(); - m_isConnected = m_socket; - if (m_isConnected) { - m_connectionQueue.moveSocketToWorkThread(m_socket); - m_connectionQueue.connectSignal(m_socket, SIGNAL(readyRead()), WorkItem::create(this, &Connection::readyReadHandler)); + ASSERT(!m_socketNotifier); + int flags = fcntl(m_socketDescriptor, F_GETFL, 0); + while (fcntl(m_socketDescriptor, F_SETFL, flags | O_NONBLOCK) == -1) { + if (errno != EINTR) { + ASSERT_NOT_REACHED(); + return false; } - } else { - m_socket = new QLocalSocket(); - m_socket->connectToServer(m_serverName); - m_connectionQueue.moveSocketToWorkThread(m_socket); - m_connectionQueue.connectSignal(m_socket, SIGNAL(readyRead()), WorkItem::create(this, &Connection::readyReadHandler)); - m_connectionQueue.connectSignal(m_socket, SIGNAL(disconnected()), WorkItem::create(this, &Connection::connectionDidClose)); - m_isConnected = m_socket->waitForConnected(); } - return m_isConnected; + + m_isConnected = true; + m_socketNotifier = m_connectionQueue.registerSocketEventHandler(m_socketDescriptor, QSocketNotifier::Read, WorkItem::create(this, &Connection::readyReadHandler)); + + // Schedule a call to readyReadHandler. Data may have arrived before installation of the signal + // handler. + m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::readyReadHandler)); + + return true; } bool Connection::platformCanSendOutgoingMessages() const { - return m_socket; + return m_socketNotifier; } bool Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments) { - ASSERT(m_socket); - - // We put the message ID last. - arguments->encodeUInt32(messageID.toInt()); + ASSERT(m_socketNotifier); + COMPILE_ASSERT(sizeof(MessageInfo) + attachmentMaxAmount * sizeof(size_t) <= messageMaxSize, AttachmentsFitToMessageInline); - size_t bufferSize = arguments->bufferSize(); + Vector<Attachment> attachments = arguments->releaseAttachments(); + AttachmentResourceGuard<Vector<Attachment>, Vector<Attachment>::iterator> attachementDisposer(attachments); - // Write message size first - // FIXME: Should just do a single write. - qint64 bytesWrittenForSize = m_socket->write(reinterpret_cast<char*>(&bufferSize), sizeof(bufferSize)); - if (bytesWrittenForSize != sizeof(bufferSize)) { - connectionDidClose(); + if (attachments.size() > (attachmentMaxAmount - 1)) { + ASSERT_NOT_REACHED(); return false; } - qint64 bytesWrittenForBuffer = m_socket->write(reinterpret_cast<char*>(arguments->buffer()), arguments->bufferSize()); - if (bytesWrittenForBuffer != arguments->bufferSize()) { - connectionDidClose(); - return false; + MessageInfo messageInfo(messageID, arguments->bufferSize(), attachments.size()); + size_t messageSizeWithBodyInline = sizeof(messageInfo) + (attachments.size() * sizeof(size_t)) + arguments->bufferSize(); + if (messageSizeWithBodyInline > messageMaxSize && arguments->bufferSize()) { + RefPtr<WebKit::SharedMemory> oolMessageBody = WebKit::SharedMemory::create(arguments->bufferSize()); + if (!oolMessageBody) + return false; + + WebKit::SharedMemory::Handle handle; + if (!oolMessageBody->createHandle(handle, WebKit::SharedMemory::ReadOnly)) + return false; + + messageInfo.setMessageBodyOOL(); + + memcpy(oolMessageBody->data(), arguments->buffer(), arguments->bufferSize()); + + attachments.append(handle.releaseToAttachment()); } - m_socket->flush(); + struct msghdr message; + memset(&message, 0, sizeof(message)); + + struct iovec iov[3]; + memset(&iov, 0, sizeof(iov)); + + message.msg_iov = iov; + int iovLength = 1; + + iov[0].iov_base = reinterpret_cast<void*>(&messageInfo); + iov[0].iov_len = sizeof(messageInfo); + + char attachmentFDBuffer[CMSG_SPACE(sizeof(int) * (attachments.size()))]; + size_t attachmentSizes[attachments.size()]; + + if (!attachments.isEmpty()) { + message.msg_control = attachmentFDBuffer; + message.msg_controllen = sizeof(attachmentFDBuffer); + + struct cmsghdr* cmsg = CMSG_FIRSTHDR(&message); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int) * attachments.size()); + + int* fdptr = reinterpret_cast<int*>(CMSG_DATA(cmsg)); + for (int i = 0; i < attachments.size(); ++i) { + attachmentSizes[i] = attachments[i].size(); + fdptr[i] = attachments[i].fileDescriptor(); + } + + message.msg_controllen = cmsg->cmsg_len; + iov[iovLength].iov_base = attachmentSizes; + iov[iovLength].iov_len = sizeof(attachmentSizes); + ++iovLength; + } + + if (!messageInfo.isMessageBodyOOL() && arguments->bufferSize()) { + iov[iovLength].iov_base = reinterpret_cast<void*>(arguments->buffer()); + iov[iovLength].iov_len = arguments->bufferSize(); + ++iovLength; + } + + message.msg_iovlen = iovLength; + + int bytesSent = 0; + while ((bytesSent = sendmsg(m_socketDescriptor, &message, 0)) == -1) { + if (errno != EINTR) + return false; + } return true; } +void Connection::setShouldCloseConnectionOnProcessTermination(WebKit::PlatformProcessIdentifier process) +{ + m_connectionQueue.scheduleWorkOnTermination(process, WorkItem::create(this, &Connection::connectionDidClose)); +} + } // namespace CoreIPC diff --git a/Source/WebKit2/Platform/CoreIPC/win/BinarySemaphoreWin.cpp b/Source/WebKit2/Platform/CoreIPC/win/BinarySemaphoreWin.cpp new file mode 100644 index 0000000..9b26a9a --- /dev/null +++ b/Source/WebKit2/Platform/CoreIPC/win/BinarySemaphoreWin.cpp @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2011 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "BinarySemaphore.h" + +namespace CoreIPC { + +BinarySemaphore::BinarySemaphore() + : m_event(::CreateEventW(0, FALSE, FALSE, 0)) +{ +} + +BinarySemaphore::~BinarySemaphore() +{ + ::CloseHandle(m_event); +} + +void BinarySemaphore::signal() +{ + ::SetEvent(m_event); +} + +bool BinarySemaphore::wait(double absoluteTime) +{ + DWORD interval = absoluteTimeToWaitTimeoutInterval(absoluteTime); + if (!interval) { + // Consider the wait to have timed out, even if the event has already been signaled, to + // match the WTF::ThreadCondition implementation. + return false; + } + + DWORD result = ::WaitForSingleObjectEx(m_event, interval, FALSE); + switch (result) { + case WAIT_OBJECT_0: + // The event was signaled. + return true; + + case WAIT_TIMEOUT: + // The wait timed out. + return false; + + case WAIT_FAILED: + ASSERT_WITH_MESSAGE(false, "::WaitForSingleObjectEx failed with error %lu", ::GetLastError()); + return false; + default: + ASSERT_WITH_MESSAGE(false, "::WaitForSingleObjectEx returned unexpected result %lu", result); + return false; + } +} + +} // namespace CoreIPC diff --git a/Source/WebKit2/Platform/CoreIPC/win/ConnectionWin.cpp b/Source/WebKit2/Platform/CoreIPC/win/ConnectionWin.cpp index 695da78..ab44658 100644 --- a/Source/WebKit2/Platform/CoreIPC/win/ConnectionWin.cpp +++ b/Source/WebKit2/Platform/CoreIPC/win/ConnectionWin.cpp @@ -23,6 +23,7 @@ * THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "Connection.h" #include "ArgumentEncoder.h" @@ -88,9 +89,6 @@ void Connection::platformInitialize(Identifier identifier) m_writeState.hEvent = ::CreateEventW(0, FALSE, FALSE, 0); m_connectionPipe = identifier; - - // We connected the two ends of the pipe in createServerAndClientIdentifiers. - m_isConnected = true; } void Connection::platformInvalidate() @@ -98,6 +96,8 @@ void Connection::platformInvalidate() if (m_connectionPipe == INVALID_HANDLE_VALUE) return; + m_isConnected = false; + m_connectionQueue.unregisterAndCloseHandle(m_readState.hEvent); m_readState.hEvent = 0; @@ -242,6 +242,10 @@ void Connection::writeEventHandler() // FIXME: We should figure out why we're getting this error. return; } + if (error == ERROR_BROKEN_PIPE) { + connectionDidClose(); + return; + } ASSERT_NOT_REACHED(); } @@ -255,6 +259,9 @@ void Connection::writeEventHandler() bool Connection::open() { + // We connected the two ends of the pipe in createServerAndClientIdentifiers. + m_isConnected = true; + // Start listening for read and write state events. m_connectionQueue.registerHandle(m_readState.hEvent, WorkItem::create(this, &Connection::readEventHandler)); m_connectionQueue.registerHandle(m_writeState.hEvent, WorkItem::create(this, &Connection::writeEventHandler)); |