diff options
author | Ben Murdoch <benm@google.com> | 2011-05-13 16:23:25 +0100 |
---|---|---|
committer | Ben Murdoch <benm@google.com> | 2011-05-16 11:35:02 +0100 |
commit | 65f03d4f644ce73618e5f4f50dd694b26f55ae12 (patch) | |
tree | f478babb801e720de7bfaee23443ffe029f58731 /Source/WebKit2/Platform/CoreIPC/Connection.h | |
parent | 47de4a2fb7262c7ebdb9cd133ad2c54c187454d0 (diff) | |
download | external_webkit-65f03d4f644ce73618e5f4f50dd694b26f55ae12.zip external_webkit-65f03d4f644ce73618e5f4f50dd694b26f55ae12.tar.gz external_webkit-65f03d4f644ce73618e5f4f50dd694b26f55ae12.tar.bz2 |
Merge WebKit at r75993: Initial merge by git.
Change-Id: I602bbdc3974787a3b0450456a30a7868286921c3
Diffstat (limited to 'Source/WebKit2/Platform/CoreIPC/Connection.h')
-rw-r--r-- | Source/WebKit2/Platform/CoreIPC/Connection.h | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/Source/WebKit2/Platform/CoreIPC/Connection.h b/Source/WebKit2/Platform/CoreIPC/Connection.h new file mode 100644 index 0000000..b7e5b0f --- /dev/null +++ b/Source/WebKit2/Platform/CoreIPC/Connection.h @@ -0,0 +1,362 @@ +/* + * Copyright (C) 2010 Apple Inc. All rights reserved. + * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies) + * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef Connection_h +#define Connection_h + +#include "ArgumentDecoder.h" +#include "ArgumentEncoder.h" +#include "Arguments.h" +#include "BinarySemaphore.h" +#include "MessageID.h" +#include "WorkQueue.h" +#include <wtf/HashMap.h> +#include <wtf/PassRefPtr.h> +#include <wtf/OwnPtr.h> +#include <wtf/Threading.h> + +#if PLATFORM(MAC) +#include <mach/mach_port.h> +#elif PLATFORM(WIN) +#include <string> +#elif PLATFORM(QT) +#include <QString> +class QLocalServer; +class QLocalSocket; +#endif + +class RunLoop; + +namespace CoreIPC { + +class MessageID; + +enum SyncReplyMode { + AutomaticReply, + ManualReply +}; + +#define MESSAGE_CHECK_BASE(assertion, connection) do \ + if (!(assertion)) { \ + ASSERT(assertion); \ + (connection)->markCurrentlyDispatchedMessageAsInvalid(); \ + return; \ + } \ +while (0) + +class Connection : public ThreadSafeShared<Connection> { +public: + class MessageReceiver { + protected: + virtual ~MessageReceiver() { } + + public: + virtual void didReceiveMessage(Connection*, MessageID, ArgumentDecoder*) = 0; + virtual SyncReplyMode didReceiveSyncMessage(Connection*, MessageID, ArgumentDecoder*, ArgumentEncoder*) { ASSERT_NOT_REACHED(); return AutomaticReply; } + }; + + class Client : public MessageReceiver { + protected: + virtual ~Client() { } + + public: + virtual void didClose(Connection*) = 0; + virtual void didReceiveInvalidMessage(Connection*, MessageID) = 0; + + // Called on the connection work queue when the connection is closed, before + // didCall is called on the client thread. + virtual void didCloseOnConnectionWorkQueue(WorkQueue*, Connection*) { } + }; + +#if PLATFORM(MAC) + typedef mach_port_t Identifier; +#elif PLATFORM(WIN) + typedef HANDLE Identifier; + static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier); +#elif PLATFORM(QT) + typedef const QString Identifier; +#elif PLATFORM(GTK) + typedef int Identifier; +#endif + + static PassRefPtr<Connection> createServerConnection(Identifier, Client*, RunLoop* clientRunLoop); + static PassRefPtr<Connection> createClientConnection(Identifier, Client*, RunLoop* clientRunLoop); + ~Connection(); + +#if PLATFORM(MAC) + void setShouldCloseConnectionOnMachExceptions(); +#endif + + bool open(); + void invalidate(); + void markCurrentlyDispatchedMessageAsInvalid(); + + // FIXME: This variant of send is deprecated, all clients should move to the overload that takes a message. + template<typename E, typename T> bool send(E messageID, uint64_t destinationID, const T& arguments); + + template<typename T> bool send(const T& message, uint64_t destinationID); + + static const unsigned long long NoTimeout = 10000000000ULL; + // FIXME: This variant of sendSync is deprecated, all clients should move to the overload that takes a message. + template<typename E, typename T, typename U> bool sendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout = NoTimeout); + + template<typename T> bool sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout = NoTimeout); + + template<typename E> PassOwnPtr<ArgumentDecoder> waitFor(E messageID, uint64_t destinationID, double timeout); + + PassOwnPtr<ArgumentEncoder> createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID); + bool sendMessage(MessageID, PassOwnPtr<ArgumentEncoder>); + bool sendSyncReply(PassOwnPtr<ArgumentEncoder>); + +private: + template<typename T> class Message { + public: + Message() + : m_arguments(0) + { + } + + Message(MessageID messageID, PassOwnPtr<T> arguments) + : m_messageID(messageID) + , m_arguments(arguments.leakPtr()) + { + } + + MessageID messageID() const { return m_messageID; } + T* arguments() const { return m_arguments; } + + PassOwnPtr<T> releaseArguments() + { + T* arguments = m_arguments; + m_arguments = 0; + + return arguments; + } + + private: + MessageID m_messageID; + T* m_arguments; + }; + +public: + typedef Message<ArgumentEncoder> OutgoingMessage; + +private: + Connection(Identifier, bool isServer, Client*, RunLoop* clientRunLoop); + void platformInitialize(Identifier); + void platformInvalidate(); + + bool isValid() const { return m_client; } + + PassOwnPtr<ArgumentDecoder> waitForMessage(MessageID, uint64_t destinationID, double timeout); + + PassOwnPtr<ArgumentDecoder> sendSyncMessage(MessageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder>, double timeout); + PassOwnPtr<ArgumentDecoder> waitForSyncReply(uint64_t syncRequestID, double timeout); + + // Called on the connection work queue. + void processIncomingMessage(MessageID, PassOwnPtr<ArgumentDecoder>); + bool canSendOutgoingMessages() const; + bool platformCanSendOutgoingMessages() const; + void sendOutgoingMessages(); + bool sendOutgoingMessage(MessageID, PassOwnPtr<ArgumentEncoder>); + void connectionDidClose(); + + // Called on the listener thread. + void dispatchConnectionDidClose(); + void dispatchMessages(); + void dispatchSyncMessage(MessageID, ArgumentDecoder*); + + Client* m_client; + bool m_isServer; + uint64_t m_syncRequestID; + + bool m_isConnected; + WorkQueue m_connectionQueue; + RunLoop* m_clientRunLoop; + + uint32_t m_inDispatchMessageCount; + bool m_didReceiveInvalidMessage; + + // Incoming messages. + typedef Message<ArgumentDecoder> IncomingMessage; + + Mutex m_incomingMessagesLock; + Vector<IncomingMessage> m_incomingMessages; + + // Outgoing messages. + Mutex m_outgoingMessagesLock; + Deque<OutgoingMessage> m_outgoingMessages; + + ThreadCondition m_waitForMessageCondition; + Mutex m_waitForMessageMutex; + HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*> m_waitForMessageMap; + + // Represents a sync request for which we're waiting on a reply. + struct PendingSyncReply { + // The request ID. + uint64_t syncRequestID; + + // The reply decoder, will be null if there was an error processing the sync + // message on the other side. + ArgumentDecoder* replyDecoder; + + // Will be set to true once a reply has been received or an error occurred. + bool didReceiveReply; + + PendingSyncReply() + : syncRequestID(0) + , replyDecoder(0) + , didReceiveReply(false) + { + } + + explicit PendingSyncReply(uint64_t syncRequestID) + : syncRequestID(syncRequestID) + , replyDecoder(0) + , didReceiveReply(0) + { + } + + PassOwnPtr<ArgumentDecoder> releaseReplyDecoder() + { + OwnPtr<ArgumentDecoder> reply = adoptPtr(replyDecoder); + replyDecoder = 0; + + return reply.release(); + } + }; + + BinarySemaphore m_waitForSyncReplySemaphore; + + Mutex m_syncReplyStateMutex; + bool m_shouldWaitForSyncReplies; + Vector<PendingSyncReply> m_pendingSyncReplies; + Vector<IncomingMessage> m_syncMessagesReceivedWhileWaitingForSyncReply; + +#if PLATFORM(MAC) + // Called on the connection queue. + void receiveSourceEventHandler(); + void initializeDeadNameSource(); + void exceptionSourceEventHandler(); + + mach_port_t m_sendPort; + mach_port_t m_receivePort; + + // If setShouldCloseConnectionOnMachExceptions has been called, this has + // the exception port that exceptions from the other end will be sent on. + mach_port_t m_exceptionPort; + +#elif PLATFORM(WIN) + // Called on the connection queue. + void readEventHandler(); + void writeEventHandler(); + + Vector<uint8_t> m_readBuffer; + OVERLAPPED m_readState; + OwnPtr<ArgumentEncoder> m_pendingWriteArguments; + OVERLAPPED m_writeState; + HANDLE m_connectionPipe; +#elif PLATFORM(QT) + // Called on the connection queue. + void readyReadHandler(); + + Vector<uint8_t> m_readBuffer; + size_t m_currentMessageSize; + QLocalSocket* m_socket; + QString m_serverName; +#elif PLATFORM(GTK) + void readEventHandler(); + void processCompletedMessage(); + bool messageProcessingCompleted() { return !m_currentMessageSize; } + + int m_socket; + Vector<uint8_t> m_readBuffer; + size_t m_currentMessageSize; + size_t m_pendingBytes; +#endif +}; + +template<typename E, typename T> +bool Connection::send(E messageID, uint64_t destinationID, const T& arguments) +{ + OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID); + argumentEncoder->encode(arguments); + + return sendMessage(MessageID(messageID), argumentEncoder.release()); +} + +template<typename T> bool Connection::send(const T& message, uint64_t destinationID) +{ + OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID); + argumentEncoder->encode(message); + + return sendMessage(MessageID(T::messageID), argumentEncoder.release()); +} + +template<typename E, typename T, typename U> +inline bool Connection::sendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout) +{ + uint64_t syncRequestID = 0; + OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID); + + // Encode the input arguments. + argumentEncoder->encode(arguments); + + // Now send the message and wait for a reply. + OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(messageID, MessageID::SyncMessage), syncRequestID, argumentEncoder.release(), timeout); + if (!replyDecoder) + return false; + + // Decode the reply. + return replyDecoder->decode(const_cast<U&>(reply)); +} + +template<typename T> bool Connection::sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout) +{ + uint64_t syncRequestID = 0; + OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID); + + // Encode the rest of the input arguments. + argumentEncoder->encode(message); + + // Now send the message and wait for a reply. + OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(T::messageID, MessageID::SyncMessage), syncRequestID, argumentEncoder.release(), timeout); + if (!replyDecoder) + return false; + + // Decode the reply. + return replyDecoder->decode(const_cast<typename T::Reply&>(reply)); +} + +template<typename E> inline PassOwnPtr<ArgumentDecoder> Connection::waitFor(E messageID, uint64_t destinationID, double timeout) +{ + return waitForMessage(MessageID(messageID), destinationID, timeout); +} + +} // namespace CoreIPC + +#endif // Connection_h |