diff options
Diffstat (limited to 'libcutils/mq.c')
-rw-r--r-- | libcutils/mq.c | 1357 |
1 files changed, 0 insertions, 1357 deletions
diff --git a/libcutils/mq.c b/libcutils/mq.c deleted file mode 100644 index 899b1bc..0000000 --- a/libcutils/mq.c +++ /dev/null @@ -1,1357 +0,0 @@ -/* - * Copyright (C) 2007 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define LOG_TAG "mq" - -#include <assert.h> -#include <errno.h> -#include <fcntl.h> -#include <pthread.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> - -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/un.h> -#include <sys/uio.h> - -#include <cutils/array.h> -#include <cutils/hashmap.h> -#include <cutils/selector.h> - -#include "loghack.h" -#include "buffer.h" - -/** Number of dead peers to remember. */ -#define PEER_HISTORY (16) - -typedef struct sockaddr SocketAddress; -typedef struct sockaddr_un UnixAddress; - -/** - * Process/user/group ID. We don't use ucred directly because it's only - * available on Linux. - */ -typedef struct { - pid_t pid; - uid_t uid; - gid_t gid; -} Credentials; - -/** Listens for bytes coming from remote peers. */ -typedef void BytesListener(Credentials credentials, char* bytes, size_t size); - -/** Listens for the deaths of remote peers. */ -typedef void DeathListener(pid_t pid); - -/** Types of packets. */ -typedef enum { - /** Request for a connection to another peer. */ - CONNECTION_REQUEST, - - /** A connection to another peer. */ - CONNECTION, - - /** Reports a failed connection attempt. */ - CONNECTION_ERROR, - - /** A generic packet of bytes. */ - BYTES, -} PacketType; - -typedef enum { - /** Reading a packet header. */ - READING_HEADER, - - /** Waiting for a connection from the master. */ - ACCEPTING_CONNECTION, - - /** Reading bytes. */ - READING_BYTES, -} InputState; - -/** A packet header. */ -// TODO: Use custom headers for master->peer, peer->master, peer->peer. -typedef struct { - PacketType type; - union { - /** Packet size. Used for BYTES. */ - size_t size; - - /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */ - Credentials credentials; - }; -} Header; - -/** A packet which will be sent to a peer. */ -typedef struct OutgoingPacket OutgoingPacket; -struct OutgoingPacket { - /** Packet header. */ - Header header; - - union { - /** Connection to peer. Used with CONNECTION. */ - int socket; - - /** Buffer of bytes. Used with BYTES. */ - Buffer* bytes; - }; - - /** Frees all resources associated with this packet. */ - void (*free)(OutgoingPacket* packet); - - /** Optional context. */ - void* context; - - /** Next packet in the queue. */ - OutgoingPacket* nextPacket; -}; - -/** Represents a remote peer. */ -typedef struct PeerProxy PeerProxy; - -/** Local peer state. You typically have one peer per process. */ -typedef struct { - /** This peer's PID. */ - pid_t pid; - - /** - * Map from pid to peer proxy. The peer has a peer proxy for each remote - * peer it's connected to. - * - * Acquire mutex before use. - */ - Hashmap* peerProxies; - - /** Manages I/O. */ - Selector* selector; - - /** Used to synchronize operations with the selector thread. */ - pthread_mutex_t mutex; - - /** Is this peer the master? */ - bool master; - - /** Peer proxy for the master. */ - PeerProxy* masterProxy; - - /** Listens for packets from remote peers. */ - BytesListener* onBytes; - - /** Listens for deaths of remote peers. */ - DeathListener* onDeath; - - /** Keeps track of recently dead peers. Requires mutex. */ - pid_t deadPeers[PEER_HISTORY]; - size_t deadPeerCursor; -} Peer; - -struct PeerProxy { - /** Credentials of the remote process. */ - Credentials credentials; - - /** Keeps track of data coming in from the remote peer. */ - InputState inputState; - Buffer* inputBuffer; - PeerProxy* connecting; - - /** File descriptor for this peer. */ - SelectableFd* fd; - - /** - * Queue of packets to be written out to the remote peer. - * - * Requires mutex. - */ - // TODO: Limit queue length. - OutgoingPacket* currentPacket; - OutgoingPacket* lastPacket; - - /** Used to write outgoing header. */ - Buffer outgoingHeader; - - /** True if this is the master's proxy. */ - bool master; - - /** Reference back to the local peer. */ - Peer* peer; - - /** - * Used in master only. Maps this peer proxy to other peer proxies to - * which the peer has been connected to. Maps pid to PeerProxy. Helps - * keep track of which connections we've sent to whom. - */ - Hashmap* connections; -}; - -/** Server socket path. */ -static const char* MASTER_PATH = "/master.peer"; - -/** Credentials of the master peer. */ -static const Credentials MASTER_CREDENTIALS = {0, 0, 0}; - -/** Creates a peer proxy and adds it to the peer proxy map. */ -static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials); - -/** Sets the non-blocking flag on a descriptor. */ -static void setNonBlocking(int fd) { - int flags; - if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { - LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); - } - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { - LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno)); - } -} - -/** Closes a fd and logs a warning if the close fails. */ -static void closeWithWarning(int fd) { - int result = close(fd); - if (result == -1) { - ALOGW("close() error: %s", strerror(errno)); - } -} - -/** Hashes pid_t keys. */ -static int pidHash(void* key) { - pid_t* pid = (pid_t*) key; - return (int) (*pid); -} - -/** Compares pid_t keys. */ -static bool pidEquals(void* keyA, void* keyB) { - pid_t* a = (pid_t*) keyA; - pid_t* b = (pid_t*) keyB; - return *a == *b; -} - -/** Gets the master address. Not thread safe. */ -static UnixAddress* getMasterAddress() { - static UnixAddress masterAddress; - static bool initialized = false; - if (initialized == false) { - masterAddress.sun_family = AF_LOCAL; - strcpy(masterAddress.sun_path, MASTER_PATH); - initialized = true; - } - return &masterAddress; -} - -/** Gets exclusive access to the peer for this thread. */ -static void peerLock(Peer* peer) { - pthread_mutex_lock(&peer->mutex); -} - -/** Releases exclusive access to the peer. */ -static void peerUnlock(Peer* peer) { - pthread_mutex_unlock(&peer->mutex); -} - -/** Frees a simple, i.e. header-only, outgoing packet. */ -static void outgoingPacketFree(OutgoingPacket* packet) { - ALOGD("Freeing outgoing packet."); - free(packet); -} - -/** - * Prepare to read a new packet from the peer. - */ -static void peerProxyExpectHeader(PeerProxy* peerProxy) { - peerProxy->inputState = READING_HEADER; - bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header)); -} - -/** Sets up the buffer for the outgoing header. */ -static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) { - peerProxy->outgoingHeader.data - = (char*) &(peerProxy->currentPacket->header); - peerProxy->outgoingHeader.size = sizeof(Header); - bufferPrepareForWrite(&peerProxy->outgoingHeader); -} - -/** Adds a packet to the end of the queue. Callers must have the mutex. */ -static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy, - OutgoingPacket* newPacket) { - newPacket->nextPacket = NULL; // Just in case. - if (peerProxy->currentPacket == NULL) { - // The queue is empty. - peerProxy->currentPacket = newPacket; - peerProxy->lastPacket = newPacket; - - peerProxyPrepareOutgoingHeader(peerProxy); - } else { - peerProxy->lastPacket->nextPacket = newPacket; - } -} - -/** Takes the peer lock and enqueues the given packet. */ -static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy, - OutgoingPacket* newPacket) { - Peer* peer = peerProxy->peer; - peerLock(peer); - peerProxyEnqueueOutgoingPacket(peerProxy, newPacket); - peerUnlock(peer); -} - -/** - * Frees current packet and moves to the next one. Returns true if there is - * a next packet or false if the queue is empty. - */ -static bool peerProxyNextPacket(PeerProxy* peerProxy) { - Peer* peer = peerProxy->peer; - peerLock(peer); - - OutgoingPacket* current = peerProxy->currentPacket; - - if (current == NULL) { - // The queue is already empty. - peerUnlock(peer); - return false; - } - - OutgoingPacket* next = current->nextPacket; - peerProxy->currentPacket = next; - current->nextPacket = NULL; - current->free(current); - if (next == NULL) { - // The queue is empty. - peerProxy->lastPacket = NULL; - peerUnlock(peer); - return false; - } else { - peerUnlock(peer); - peerProxyPrepareOutgoingHeader(peerProxy); - - // TODO: Start writing next packet? It would reduce the number of - // system calls, but we could also starve other peers. - return true; - } -} - -/** - * Checks whether a peer died recently. - */ -static bool peerIsDead(Peer* peer, pid_t pid) { - size_t i; - for (i = 0; i < PEER_HISTORY; i++) { - pid_t deadPeer = peer->deadPeers[i]; - if (deadPeer == 0) { - return false; - } - if (deadPeer == pid) { - return true; - } - } - return false; -} - -/** - * Cleans up connection information. - */ -static bool peerProxyRemoveConnection(void* key, void* value, void* context) { - PeerProxy* deadPeer = (PeerProxy*) context; - PeerProxy* otherPeer = (PeerProxy*) value; - hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid)); - return true; -} - -/** - * Called when the peer dies. - */ -static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) { - if (errnoIsSet) { - ALOGI("Peer %d died. errno: %s", peerProxy->credentials.pid, - strerror(errno)); - } else { - ALOGI("Peer %d died.", peerProxy->credentials.pid); - } - - // If we lost the master, we're up a creek. We can't let this happen. - if (peerProxy->master) { - LOG_ALWAYS_FATAL("Lost connection to master."); - } - - Peer* localPeer = peerProxy->peer; - pid_t pid = peerProxy->credentials.pid; - - peerLock(localPeer); - - // Remember for awhile that the peer died. - localPeer->deadPeers[localPeer->deadPeerCursor] - = peerProxy->credentials.pid; - localPeer->deadPeerCursor++; - if (localPeer->deadPeerCursor == PEER_HISTORY) { - localPeer->deadPeerCursor = 0; - } - - // Remove from peer map. - hashmapRemove(localPeer->peerProxies, &pid); - - // External threads can no longer get to this peer proxy, so we don't - // need the lock anymore. - peerUnlock(localPeer); - - // Remove the fd from the selector. - if (peerProxy->fd != NULL) { - peerProxy->fd->remove = true; - } - - // Clear outgoing packet queue. - while (peerProxyNextPacket(peerProxy)) {} - - bufferFree(peerProxy->inputBuffer); - - // This only applies to the master. - if (peerProxy->connections != NULL) { - // We can't leave these other maps pointing to freed memory. - hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection, - peerProxy); - hashmapFree(peerProxy->connections); - } - - // Invoke death listener. - localPeer->onDeath(pid); - - // Free the peer proxy itself. - free(peerProxy); -} - -static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) { - if (errno == EINTR) { - // Log interruptions but otherwise ignore them. - ALOGW("%s() interrupted.", functionName); - } else if (errno == EAGAIN) { - ALOGD("EWOULDBLOCK"); - // Ignore. - } else { - ALOGW("Error returned by %s().", functionName); - peerProxyKill(peerProxy, true); - } -} - -/** - * Buffers output sent to a peer. May be called multiple times until the entire - * buffer is filled. Returns true when the buffer is empty. - */ -static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) { - ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd); - if (size < 0) { - peerProxyHandleError(peerProxy, "write"); - return false; - } else { - return bufferWriteComplete(outgoing); - } -} - -/** Writes packet bytes to peer. */ -static void peerProxyWriteBytes(PeerProxy* peerProxy) { - Buffer* buffer = peerProxy->currentPacket->bytes; - if (peerProxyWriteFromBuffer(peerProxy, buffer)) { - ALOGD("Bytes written."); - peerProxyNextPacket(peerProxy); - } -} - -/** Sends a socket to the peer. */ -static void peerProxyWriteConnection(PeerProxy* peerProxy) { - int socket = peerProxy->currentPacket->socket; - - // Why does sending and receiving fds have to be such a PITA? - struct msghdr msg; - struct iovec iov[1]; - - union { - struct cmsghdr cm; - char control[CMSG_SPACE(sizeof(int))]; - } control_un; - - struct cmsghdr *cmptr; - - msg.msg_control = control_un.control; - msg.msg_controllen = sizeof(control_un.control); - cmptr = CMSG_FIRSTHDR(&msg); - cmptr->cmsg_len = CMSG_LEN(sizeof(int)); - cmptr->cmsg_level = SOL_SOCKET; - cmptr->cmsg_type = SCM_RIGHTS; - - // Store the socket in the message. - *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - iov[0].iov_base = ""; - iov[0].iov_len = 1; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0); - - if (result < 0) { - peerProxyHandleError(peerProxy, "sendmsg"); - } else { - // Success. Queue up the next packet. - peerProxyNextPacket(peerProxy); - - } -} - -/** - * Writes some outgoing data. - */ -static void peerProxyWrite(SelectableFd* fd) { - // TODO: Try to write header and body with one system call. - - PeerProxy* peerProxy = (PeerProxy*) fd->data; - OutgoingPacket* current = peerProxy->currentPacket; - - if (current == NULL) { - // We have nothing left to write. - return; - } - - // Write the header. - Buffer* outgoingHeader = &peerProxy->outgoingHeader; - bool headerWritten = bufferWriteComplete(outgoingHeader); - if (!headerWritten) { - ALOGD("Writing header..."); - headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader); - if (headerWritten) { - ALOGD("Header written."); - } - } - - // Write body. - if (headerWritten) { - PacketType type = current->header.type; - switch (type) { - case CONNECTION: - peerProxyWriteConnection(peerProxy); - break; - case BYTES: - peerProxyWriteBytes(peerProxy); - break; - case CONNECTION_REQUEST: - case CONNECTION_ERROR: - // These packets consist solely of a header. - peerProxyNextPacket(peerProxy); - break; - default: - LOG_ALWAYS_FATAL("Unknown packet type: %d", type); - } - } -} - -/** - * Sets up a peer proxy's fd before we try to select() it. - */ -static void peerProxyBeforeSelect(SelectableFd* fd) { - ALOGD("Before select..."); - - PeerProxy* peerProxy = (PeerProxy*) fd->data; - - peerLock(peerProxy->peer); - bool hasPackets = peerProxy->currentPacket != NULL; - peerUnlock(peerProxy->peer); - - if (hasPackets) { - ALOGD("Packets found. Setting onWritable()."); - - fd->onWritable = &peerProxyWrite; - } else { - // We have nothing to write. - fd->onWritable = NULL; - } -} - -/** Prepare to read bytes from the peer. */ -static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) { - ALOGD("Expecting %zd bytes.", header->size); - - peerProxy->inputState = READING_BYTES; - if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) { - ALOGW("Couldn't allocate memory for incoming data. Size: %u", - (unsigned int) header->size); - - // TODO: Ignore the packet and log a warning? - peerProxyKill(peerProxy, false); - } -} - -/** - * Gets a peer proxy for the given ID. Creates a peer proxy if necessary. - * Sends a connection request to the master if desired. - * - * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died - * or ENOMEM if memory couldn't be allocated. - */ -static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid, - bool requestConnection) { - if (pid == peer->pid) { - errno = EINVAL; - return NULL; - } - - if (peerIsDead(peer, pid)) { - errno = EHOSTDOWN; - return NULL; - } - - PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid); - if (peerProxy != NULL) { - return peerProxy; - } - - // If this is the master peer, we already know about all peers. - if (peer->master) { - errno = EHOSTDOWN; - return NULL; - } - - // Try to create a peer proxy. - Credentials credentials; - credentials.pid = pid; - - // Fake gid and uid until we have the real thing. The real creds are - // filled in by masterProxyExpectConnection(). These fake creds will - // never be exposed to the user. - credentials.uid = 0; - credentials.gid = 0; - - // Make sure we can allocate the connection request packet. - OutgoingPacket* packet = NULL; - if (requestConnection) { - packet = calloc(1, sizeof(OutgoingPacket)); - if (packet == NULL) { - errno = ENOMEM; - return NULL; - } - - packet->header.type = CONNECTION_REQUEST; - packet->header.credentials = credentials; - packet->free = &outgoingPacketFree; - } - - peerProxy = peerProxyCreate(peer, credentials); - if (peerProxy == NULL) { - free(packet); - errno = ENOMEM; - return NULL; - } else { - // Send a connection request to the master. - if (requestConnection) { - PeerProxy* masterProxy = peer->masterProxy; - peerProxyEnqueueOutgoingPacket(masterProxy, packet); - } - - return peerProxy; - } -} - -/** - * Switches the master peer proxy into a state where it's waiting for a - * connection from the master. - */ -static void masterProxyExpectConnection(PeerProxy* masterProxy, - Header* header) { - // TODO: Restructure things so we don't need this check. - // Verify that this really is the master. - if (!masterProxy->master) { - ALOGW("Non-master process %d tried to send us a connection.", - masterProxy->credentials.pid); - // Kill off the evil peer. - peerProxyKill(masterProxy, false); - return; - } - - masterProxy->inputState = ACCEPTING_CONNECTION; - Peer* localPeer = masterProxy->peer; - - // Create a peer proxy so we have somewhere to stash the creds. - // See if we already have a proxy set up. - pid_t pid = header->credentials.pid; - peerLock(localPeer); - PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false); - if (peerProxy == NULL) { - ALOGW("Peer proxy creation failed: %s", strerror(errno)); - } else { - // Fill in full credentials. - peerProxy->credentials = header->credentials; - } - peerUnlock(localPeer); - - // Keep track of which peer proxy we're accepting a connection for. - masterProxy->connecting = peerProxy; -} - -/** - * Reads input from a peer process. - */ -static void peerProxyRead(SelectableFd* fd); - -/** Sets up fd callbacks. */ -static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) { - peerProxy->fd = fd; - fd->data = peerProxy; - fd->onReadable = &peerProxyRead; - fd->beforeSelect = &peerProxyBeforeSelect; - - // Make the socket non-blocking. - setNonBlocking(fd->fd); -} - -/** - * Accepts a connection sent by the master proxy. - */ -static void masterProxyAcceptConnection(PeerProxy* masterProxy) { - struct msghdr msg; - struct iovec iov[1]; - ssize_t size; - char ignored; - int incomingFd; - - // TODO: Reuse code which writes the connection. Who the heck designed - // this API anyway? - union { - struct cmsghdr cm; - char control[CMSG_SPACE(sizeof(int))]; - } control_un; - struct cmsghdr *cmptr; - msg.msg_control = control_un.control; - msg.msg_controllen = sizeof(control_un.control); - - msg.msg_name = NULL; - msg.msg_namelen = 0; - - // We sent 1 byte of data so we can detect EOF. - iov[0].iov_base = &ignored; - iov[0].iov_len = 1; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - size = recvmsg(masterProxy->fd->fd, &msg, 0); - if (size < 0) { - if (errno == EINTR) { - // Log interruptions but otherwise ignore them. - ALOGW("recvmsg() interrupted."); - return; - } else if (errno == EAGAIN) { - // Keep waiting for the connection. - return; - } else { - LOG_ALWAYS_FATAL("Error reading connection from master: %s", - strerror(errno)); - } - } else if (size == 0) { - // EOF. - LOG_ALWAYS_FATAL("Received EOF from master."); - } - - // Extract fd from message. - if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL - && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) { - if (cmptr->cmsg_level != SOL_SOCKET) { - LOG_ALWAYS_FATAL("Expected SOL_SOCKET."); - } - if (cmptr->cmsg_type != SCM_RIGHTS) { - LOG_ALWAYS_FATAL("Expected SCM_RIGHTS."); - } - incomingFd = *((int*) CMSG_DATA(cmptr)); - } else { - LOG_ALWAYS_FATAL("Expected fd."); - } - - // The peer proxy this connection is for. - PeerProxy* peerProxy = masterProxy->connecting; - if (peerProxy == NULL) { - ALOGW("Received connection for unknown peer."); - closeWithWarning(incomingFd); - } else { - Peer* peer = masterProxy->peer; - - SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd); - if (selectableFd == NULL) { - ALOGW("Error adding fd to selector for %d.", - peerProxy->credentials.pid); - closeWithWarning(incomingFd); - peerProxyKill(peerProxy, false); - } - - peerProxySetFd(peerProxy, selectableFd); - } - - peerProxyExpectHeader(masterProxy); -} - -/** - * Frees an outgoing packet containing a connection. - */ -static void outgoingPacketFreeSocket(OutgoingPacket* packet) { - closeWithWarning(packet->socket); - outgoingPacketFree(packet); -} - -/** - * Connects two known peers. - */ -static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) { - int sockets[2]; - int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets); - if (result == -1) { - ALOGW("socketpair() error: %s", strerror(errno)); - // TODO: Send CONNECTION_FAILED packets to peers. - return; - } - - OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket)); - OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket)); - if (packetA == NULL || packetB == NULL) { - free(packetA); - free(packetB); - ALOGW("malloc() error. Failed to tell process %d that process %d is" - " dead.", peerA->credentials.pid, peerB->credentials.pid); - return; - } - - packetA->header.type = CONNECTION; - packetB->header.type = CONNECTION; - - packetA->header.credentials = peerB->credentials; - packetB->header.credentials = peerA->credentials; - - packetA->socket = sockets[0]; - packetB->socket = sockets[1]; - - packetA->free = &outgoingPacketFreeSocket; - packetB->free = &outgoingPacketFreeSocket; - - peerLock(peerA->peer); - peerProxyEnqueueOutgoingPacket(peerA, packetA); - peerProxyEnqueueOutgoingPacket(peerB, packetB); - peerUnlock(peerA->peer); -} - -/** - * Informs a peer that the peer they're trying to connect to couldn't be - * found. - */ -static void masterReportConnectionError(PeerProxy* peerProxy, - Credentials credentials) { - OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); - if (packet == NULL) { - ALOGW("malloc() error. Failed to tell process %d that process %d is" - " dead.", peerProxy->credentials.pid, credentials.pid); - return; - } - - packet->header.type = CONNECTION_ERROR; - packet->header.credentials = credentials; - packet->free = &outgoingPacketFree; - - peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet); -} - -/** - * Handles a request to be connected to another peer. - */ -static void masterHandleConnectionRequest(PeerProxy* peerProxy, - Header* header) { - Peer* master = peerProxy->peer; - pid_t targetPid = header->credentials.pid; - if (!hashmapContainsKey(peerProxy->connections, &targetPid)) { - // We haven't connected these peers yet. - PeerProxy* targetPeer - = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid); - if (targetPeer == NULL) { - // Unknown process. - masterReportConnectionError(peerProxy, header->credentials); - } else { - masterConnectPeers(peerProxy, targetPeer); - } - } - - // This packet is complete. Get ready for the next one. - peerProxyExpectHeader(peerProxy); -} - -/** - * The master told us this peer is dead. - */ -static void masterProxyHandleConnectionError(PeerProxy* masterProxy, - Header* header) { - Peer* peer = masterProxy->peer; - - // Look up the peer proxy. - pid_t pid = header->credentials.pid; - PeerProxy* peerProxy = NULL; - peerLock(peer); - peerProxy = hashmapGet(peer->peerProxies, &pid); - peerUnlock(peer); - - if (peerProxy != NULL) { - ALOGI("Couldn't connect to %d.", pid); - peerProxyKill(peerProxy, false); - } else { - ALOGW("Peer proxy for %d not found. This shouldn't happen.", pid); - } - - peerProxyExpectHeader(masterProxy); -} - -/** - * Handles a packet header. - */ -static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) { - switch (header->type) { - case CONNECTION_REQUEST: - masterHandleConnectionRequest(peerProxy, header); - break; - case CONNECTION: - masterProxyExpectConnection(peerProxy, header); - break; - case CONNECTION_ERROR: - masterProxyHandleConnectionError(peerProxy, header); - break; - case BYTES: - peerProxyExpectBytes(peerProxy, header); - break; - default: - ALOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid, - header->type); - peerProxyKill(peerProxy, false); - } -} - -/** - * Buffers input sent by peer. May be called multiple times until the entire - * buffer is filled. Returns true when the buffer is full. - */ -static bool peerProxyBufferInput(PeerProxy* peerProxy) { - Buffer* in = peerProxy->inputBuffer; - ssize_t size = bufferRead(in, peerProxy->fd->fd); - if (size < 0) { - peerProxyHandleError(peerProxy, "read"); - return false; - } else if (size == 0) { - // EOF. - ALOGI("EOF"); - peerProxyKill(peerProxy, false); - return false; - } else if (bufferReadComplete(in)) { - // We're done! - return true; - } else { - // Continue reading. - return false; - } -} - -/** - * Reads input from a peer process. - */ -static void peerProxyRead(SelectableFd* fd) { - ALOGD("Reading..."); - PeerProxy* peerProxy = (PeerProxy*) fd->data; - int state = peerProxy->inputState; - Buffer* in = peerProxy->inputBuffer; - switch (state) { - case READING_HEADER: - if (peerProxyBufferInput(peerProxy)) { - ALOGD("Header read."); - // We've read the complete header. - Header* header = (Header*) in->data; - peerProxyHandleHeader(peerProxy, header); - } - break; - case READING_BYTES: - ALOGD("Reading bytes..."); - if (peerProxyBufferInput(peerProxy)) { - ALOGD("Bytes read."); - // We have the complete packet. Notify bytes listener. - peerProxy->peer->onBytes(peerProxy->credentials, - in->data, in->size); - - // Get ready for the next packet. - peerProxyExpectHeader(peerProxy); - } - break; - case ACCEPTING_CONNECTION: - masterProxyAcceptConnection(peerProxy); - break; - default: - LOG_ALWAYS_FATAL("Unknown state: %d", state); - } -} - -static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) { - PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy)); - if (peerProxy == NULL) { - return NULL; - } - - peerProxy->inputBuffer = bufferCreate(sizeof(Header)); - if (peerProxy->inputBuffer == NULL) { - free(peerProxy); - return NULL; - } - - peerProxy->peer = peer; - peerProxy->credentials = credentials; - - // Initial state == expecting a header. - peerProxyExpectHeader(peerProxy); - - // Add this proxy to the map. Make sure the key points to the stable memory - // inside of the peer proxy itself. - pid_t* pid = &(peerProxy->credentials.pid); - hashmapPut(peer->peerProxies, pid, peerProxy); - return peerProxy; -} - -/** Accepts a connection to the master peer. */ -static void masterAcceptConnection(SelectableFd* listenerFd) { - // Accept connection. - int socket = accept(listenerFd->fd, NULL, NULL); - if (socket == -1) { - ALOGW("accept() error: %s", strerror(errno)); - return; - } - - ALOGD("Accepted connection as fd %d.", socket); - - // Get credentials. - Credentials credentials; - struct ucred ucredentials; - socklen_t credentialsSize = sizeof(struct ucred); - int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED, - &ucredentials, &credentialsSize); - // We might want to verify credentialsSize. - if (result == -1) { - ALOGW("getsockopt() error: %s", strerror(errno)); - closeWithWarning(socket); - return; - } - - // Copy values into our own structure so we know we have the types right. - credentials.pid = ucredentials.pid; - credentials.uid = ucredentials.uid; - credentials.gid = ucredentials.gid; - - ALOGI("Accepted connection from process %d.", credentials.pid); - - Peer* masterPeer = (Peer*) listenerFd->data; - - peerLock(masterPeer); - - // Make sure we don't already have a connection from that process. - PeerProxy* peerProxy - = hashmapGet(masterPeer->peerProxies, &credentials.pid); - if (peerProxy != NULL) { - peerUnlock(masterPeer); - ALOGW("Alread connected to process %d.", credentials.pid); - closeWithWarning(socket); - return; - } - - // Add connection to the selector. - SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket); - if (socketFd == NULL) { - peerUnlock(masterPeer); - ALOGW("malloc() failed."); - closeWithWarning(socket); - return; - } - - // Create a peer proxy. - peerProxy = peerProxyCreate(masterPeer, credentials); - peerUnlock(masterPeer); - if (peerProxy == NULL) { - ALOGW("malloc() failed."); - socketFd->remove = true; - closeWithWarning(socket); - } - peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals); - peerProxySetFd(peerProxy, socketFd); -} - -/** - * Creates the local peer. - */ -static Peer* peerCreate() { - Peer* peer = calloc(1, sizeof(Peer)); - if (peer == NULL) { - LOG_ALWAYS_FATAL("malloc() error."); - } - peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals); - peer->selector = selectorCreate(); - - pthread_mutexattr_t attributes; - if (pthread_mutexattr_init(&attributes) != 0) { - LOG_ALWAYS_FATAL("pthread_mutexattr_init() error."); - } - if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) { - LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error."); - } - if (pthread_mutex_init(&peer->mutex, &attributes) != 0) { - LOG_ALWAYS_FATAL("pthread_mutex_init() error."); - } - - peer->pid = getpid(); - return peer; -} - -/** The local peer. */ -static Peer* localPeer; - -/** Frees a packet of bytes. */ -static void outgoingPacketFreeBytes(OutgoingPacket* packet) { - ALOGD("Freeing outgoing packet."); - bufferFree(packet->bytes); - free(packet); -} - -/** - * Sends a packet of bytes to a remote peer. Returns 0 on success. - * - * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be - * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno - * to EINVAL if pid is the same as the local pid. - */ -int peerSendBytes(pid_t pid, const char* bytes, size_t size) { - Peer* peer = localPeer; - assert(peer != NULL); - - OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); - if (packet == NULL) { - errno = ENOMEM; - return -1; - } - - Buffer* copy = bufferCreate(size); - if (copy == NULL) { - free(packet); - errno = ENOMEM; - return -1; - } - - // Copy data. - memcpy(copy->data, bytes, size); - copy->size = size; - - packet->bytes = copy; - packet->header.type = BYTES; - packet->header.size = size; - packet->free = outgoingPacketFreeBytes; - bufferPrepareForWrite(packet->bytes); - - peerLock(peer); - - PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); - if (peerProxy == NULL) { - // The peer is already dead or we couldn't alloc memory. Either way, - // errno is set. - peerUnlock(peer); - packet->free(packet); - return -1; - } else { - peerProxyEnqueueOutgoingPacket(peerProxy, packet); - peerUnlock(peer); - selectorWakeUp(peer->selector); - return 0; - } -} - -/** Keeps track of how to free shared bytes. */ -typedef struct { - void (*free)(void* context); - void* context; -} SharedBytesFreer; - -/** Frees shared bytes. */ -static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) { - SharedBytesFreer* sharedBytesFreer - = (SharedBytesFreer*) packet->context; - sharedBytesFreer->free(sharedBytesFreer->context); - free(sharedBytesFreer); - free(packet); -} - -/** - * Sends a packet of bytes to a remote peer without copying the bytes. Calls - * free() with context after the bytes have been sent. - * - * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be - * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno - * to EINVAL if pid is the same as the local pid. - */ -int peerSendSharedBytes(pid_t pid, char* bytes, size_t size, - void (*free)(void* context), void* context) { - Peer* peer = localPeer; - assert(peer != NULL); - - OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket)); - if (packet == NULL) { - errno = ENOMEM; - return -1; - } - - Buffer* wrapper = bufferWrap(bytes, size, size); - if (wrapper == NULL) { - free(packet); - errno = ENOMEM; - return -1; - } - - SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer)); - if (sharedBytesFreer == NULL) { - free(packet); - free(wrapper); - errno = ENOMEM; - return -1; - } - sharedBytesFreer->free = free; - sharedBytesFreer->context = context; - - packet->bytes = wrapper; - packet->context = sharedBytesFreer; - packet->header.type = BYTES; - packet->header.size = size; - packet->free = &outgoingPacketFreeSharedBytes; - bufferPrepareForWrite(packet->bytes); - - peerLock(peer); - - PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true); - if (peerProxy == NULL) { - // The peer is already dead or we couldn't alloc memory. Either way, - // errno is set. - peerUnlock(peer); - packet->free(packet); - return -1; - } else { - peerProxyEnqueueOutgoingPacket(peerProxy, packet); - peerUnlock(peer); - selectorWakeUp(peer->selector); - return 0; - } -} - -/** - * Starts the master peer. The master peer differs from other peers in that - * it is responsible for connecting the other peers. You can only have one - * master peer. - * - * Goes into an I/O loop and does not return. - */ -void masterPeerInitialize(BytesListener* bytesListener, - DeathListener* deathListener) { - // Create and bind socket. - int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0); - if (listenerSocket == -1) { - LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); - } - unlink(MASTER_PATH); - int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(), - sizeof(UnixAddress)); - if (result == -1) { - LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno)); - } - - ALOGD("Listener socket: %d", listenerSocket); - - // Queue up to 16 connections. - result = listen(listenerSocket, 16); - if (result != 0) { - LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno)); - } - - // Make socket non-blocking. - setNonBlocking(listenerSocket); - - // Create the peer for this process. Fail if we already have one. - if (localPeer != NULL) { - LOG_ALWAYS_FATAL("Peer is already initialized."); - } - localPeer = peerCreate(); - if (localPeer == NULL) { - LOG_ALWAYS_FATAL("malloc() failed."); - } - localPeer->master = true; - localPeer->onBytes = bytesListener; - localPeer->onDeath = deathListener; - - // Make listener socket selectable. - SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket); - if (listenerFd == NULL) { - LOG_ALWAYS_FATAL("malloc() error."); - } - listenerFd->data = localPeer; - listenerFd->onReadable = &masterAcceptConnection; -} - -/** - * Starts a local peer. - * - * Goes into an I/O loop and does not return. - */ -void peerInitialize(BytesListener* bytesListener, - DeathListener* deathListener) { - // Connect to master peer. - int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0); - if (masterSocket == -1) { - LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno)); - } - int result = connect(masterSocket, (SocketAddress*) getMasterAddress(), - sizeof(UnixAddress)); - if (result != 0) { - LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno)); - } - - // Create the peer for this process. Fail if we already have one. - if (localPeer != NULL) { - LOG_ALWAYS_FATAL("Peer is already initialized."); - } - localPeer = peerCreate(); - if (localPeer == NULL) { - LOG_ALWAYS_FATAL("malloc() failed."); - } - localPeer->onBytes = bytesListener; - localPeer->onDeath = deathListener; - - // Make connection selectable. - SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket); - if (masterFd == NULL) { - LOG_ALWAYS_FATAL("malloc() error."); - } - - // Create a peer proxy for the master peer. - PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS); - if (masterProxy == NULL) { - LOG_ALWAYS_FATAL("malloc() error."); - } - peerProxySetFd(masterProxy, masterFd); - masterProxy->master = true; - localPeer->masterProxy = masterProxy; -} - -/** Starts the master peer I/O loop. Doesn't return. */ -void peerLoop() { - assert(localPeer != NULL); - - // Start selector. - selectorLoop(localPeer->selector); -} - |