diff options
Diffstat (limited to 'android/async-socket.c')
-rw-r--r-- | android/async-socket.c | 1314 |
1 files changed, 1314 insertions, 0 deletions
diff --git a/android/async-socket.c b/android/async-socket.c new file mode 100644 index 0000000..ab4e4b6 --- /dev/null +++ b/android/async-socket.c @@ -0,0 +1,1314 @@ +/* + * Copyright (C) 2012 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Encapsulates exchange protocol between the emulator, and an Android device + * that is connected to the host via USB. The communication is established over + * a TCP port forwarding, enabled by ADB. + */ + +#include "android/utils/debug.h" +#include "android/async-socket-connector.h" +#include "android/async-socket.h" +#include "utils/panic.h" +#include "iolooper.h" + +#define E(...) derror(__VA_ARGS__) +#define W(...) dwarning(__VA_ARGS__) +#define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__) +#define D_ACTIVE VERBOSE_CHECK(asyncsocket) + +#define TRACE_ON 0 + +#if TRACE_ON +#define T(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__) +#else +#define T(...) +#endif + +/******************************************************************************** + * Asynchronous Socket internal API declarations + *******************************************************************************/ + +/* Gets socket's address string. */ +static const char* _async_socket_string(AsyncSocket* as); + +/* Gets socket's looper. */ +static Looper* _async_socket_get_looper(AsyncSocket* as); + +/* Handler for the I/O time out. + * Param: + * as - Asynchronous socket for the I/O. + * asio - Desciptor for the timed out I/O. + */ +static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as, + AsyncSocketIO* asio); + +/******************************************************************************** + * Asynchronous Socket Reader / Writer + *******************************************************************************/ + +struct AsyncSocketIO { + /* Next I/O in the reader, or writer list. */ + AsyncSocketIO* next; + /* Asynchronous socket for this I/O. */ + AsyncSocket* as; + /* Timer used for time outs on this I/O. */ + LoopTimer timer[1]; + /* An opaque pointer associated with this I/O. */ + void* io_opaque; + /* Buffer where to read / write data. */ + uint8_t* buffer; + /* Bytes to transfer through the socket for this I/O. */ + uint32_t to_transfer; + /* Bytes thransferred through the socket in this I/O. */ + uint32_t transferred; + /* I/O callback for this I/O. */ + on_as_io_cb on_io; + /* I/O type selector: 1 - read, 0 - write. */ + int is_io_read; + /* State of the I/O. */ + AsyncIOState state; + /* Number of outstanding references to the I/O. */ + int ref_count; + /* Deadline for this I/O */ + Duration deadline; +}; + +/* + * Recycling I/O instances. + * Since AsyncSocketIO instances are not that large, it makes sence to recycle + * them for faster allocation, rather than allocating and freeing them for each + * I/O on the socket. + */ + +/* List of recycled I/O descriptors. */ +static AsyncSocketIO* _asio_recycled = NULL; +/* Number of I/O descriptors that are recycled in the _asio_recycled list. */ +static int _recycled_asio_count = 0; +/* Maximum number of I/O descriptors that can be recycled. */ +static const int _max_recycled_asio_num = 32; + +/* Handler for an I/O time-out timer event. + * When this routine is invoked, it indicates that a time out has occurred on an + * I/O. + * Param: + * opaque - AsyncSocketIO instance representing the timed out I/O. + */ +static void _on_async_socket_io_timed_out(void* opaque); + +/* Creates new I/O descriptor. + * Param: + * as - Asynchronous socket for the I/O. + * is_io_read - I/O type selector: 1 - read, 0 - write. + * buffer, len - Reader / writer buffer address. + * io_cb - Callback for this reader / writer. + * io_opaque - An opaque pointer associated with the I/O. + * deadline - Deadline to complete the I/O. + * Return: + * Initialized AsyncSocketIO instance. + */ +static AsyncSocketIO* +_async_socket_rw_new(AsyncSocket* as, + int is_io_read, + void* buffer, + uint32_t len, + on_as_io_cb io_cb, + void* io_opaque, + Duration deadline) +{ + /* Lookup in the recycler first. */ + AsyncSocketIO* asio = _asio_recycled; + if (asio != NULL) { + /* Pull the descriptor from recycler. */ + _asio_recycled = asio->next; + _recycled_asio_count--; + } else { + /* No recycled descriptors. Allocate new one. */ + ANEW0(asio); + } + + asio->next = NULL; + asio->as = as; + asio->is_io_read = is_io_read; + asio->buffer = (uint8_t*)buffer; + asio->to_transfer = len; + asio->transferred = 0; + asio->on_io = io_cb; + asio->io_opaque = io_opaque; + asio->state = ASIO_STATE_QUEUED; + asio->ref_count = 1; + asio->deadline = deadline; + loopTimer_init(asio->timer, _async_socket_get_looper(as), + _on_async_socket_io_timed_out, asio); + loopTimer_startAbsolute(asio->timer, deadline); + + /* Reference socket that is holding this I/O. */ + async_socket_reference(as); + + T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data", + _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len); + + return asio; +} + +/* Destroys and frees I/O descriptor. */ +static void +_async_socket_io_free(AsyncSocketIO* asio) +{ + AsyncSocket* const as = asio->as; + + T("ASocket %s: %s I/O descriptor %p is destroyed.", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); + + loopTimer_done(asio->timer); + + /* Try to recycle it first, and free the memory if recycler is full. */ + if (_recycled_asio_count < _max_recycled_asio_num) { + asio->next = _asio_recycled; + _asio_recycled = asio; + _recycled_asio_count++; + } else { + AFREE(asio); + } + + /* Release socket that is holding this I/O. */ + async_socket_release(as); +} + +/* An I/O has been finished and its descriptor is about to be discarded. */ +static void +_async_socket_io_finished(AsyncSocketIO* asio) +{ + /* Notify the client of the I/O that I/O is finished. */ + asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED); +} + +int +async_socket_io_reference(AsyncSocketIO* asio) +{ + assert(asio->ref_count > 0); + asio->ref_count++; + return asio->ref_count; +} + +int +async_socket_io_release(AsyncSocketIO* asio) +{ + assert(asio->ref_count > 0); + asio->ref_count--; + if (asio->ref_count == 0) { + _async_socket_io_finished(asio); + /* Last reference has been dropped. Destroy this object. */ + _async_socket_io_free(asio); + return 0; + } + return asio->ref_count; +} + +/* Creates new asynchronous socket reader. + * Param: + * as - Asynchronous socket for the reader. + * buffer, len - Reader's buffer. + * io_cb - Reader's callback. + * reader_opaque - An opaque pointer associated with the reader. + * deadline - Deadline to complete the operation. + * Return: + * An initialized AsyncSocketIO intance. + */ +static AsyncSocketIO* +_async_socket_reader_new(AsyncSocket* as, + void* buffer, + uint32_t len, + on_as_io_cb io_cb, + void* reader_opaque, + Duration deadline) +{ + AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb, + reader_opaque, deadline); + return asio; +} + +/* Creates new asynchronous socket writer. + * Param: + * as - Asynchronous socket for the writer. + * buffer, len - Writer's buffer. + * io_cb - Writer's callback. + * writer_opaque - An opaque pointer associated with the writer. + * deadline - Deadline to complete the operation. + * Return: + * An initialized AsyncSocketIO intance. + */ +static AsyncSocketIO* +_async_socket_writer_new(AsyncSocket* as, + const void* buffer, + uint32_t len, + on_as_io_cb io_cb, + void* writer_opaque, + Duration deadline) +{ + AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len, + io_cb, writer_opaque, + deadline); + return asio; +} + +/* I/O timed out. */ +static void +_on_async_socket_io_timed_out(void* opaque) +{ + AsyncSocketIO* const asio = (AsyncSocketIO*)opaque; + AsyncSocket* const as = asio->as; + + D("ASocket %s: %s I/O with deadline %lld has timed out at %lld", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", + asio->deadline, async_socket_deadline(as, 0)); + + /* Reference while in callback. */ + async_socket_io_reference(asio); + _async_socket_io_timed_out(asio->as, asio); + async_socket_io_release(asio); +} + +/******************************************************************************** + * Public Asynchronous Socket I/O API + *******************************************************************************/ + +AsyncSocket* +async_socket_io_get_socket(const AsyncSocketIO* asio) +{ + async_socket_reference(asio->as); + return asio->as; +} + +void +async_socket_io_cancel_time_out(AsyncSocketIO* asio) +{ + loopTimer_stop(asio->timer); +} + +void* +async_socket_io_get_io_opaque(const AsyncSocketIO* asio) +{ + return asio->io_opaque; +} + +void* +async_socket_io_get_client_opaque(const AsyncSocketIO* asio) +{ + return async_socket_get_client_opaque(asio->as); +} + +void* +async_socket_io_get_buffer_info(const AsyncSocketIO* asio, + uint32_t* transferred, + uint32_t* to_transfer) +{ + if (transferred != NULL) { + *transferred = asio->transferred; + } + if (to_transfer != NULL) { + *to_transfer = asio->to_transfer; + } + return asio->buffer; +} + +void* +async_socket_io_get_buffer(const AsyncSocketIO* asio) +{ + return asio->buffer; +} + +uint32_t +async_socket_io_get_transferred(const AsyncSocketIO* asio) +{ + return asio->transferred; +} + +uint32_t +async_socket_io_get_to_transfer(const AsyncSocketIO* asio) +{ + return asio->to_transfer; +} + +int +async_socket_io_is_read(const AsyncSocketIO* asio) +{ + return asio->is_io_read; +} + +/******************************************************************************** + * Asynchronous Socket internals + *******************************************************************************/ + +struct AsyncSocket { + /* TCP address for the socket. */ + SockAddress address; + /* Connection callback for this socket. */ + on_as_connection_cb on_connection; + /* An opaque pointer associated with this socket by the client. */ + void* client_opaque; + /* I/O looper for asynchronous I/O on the socket. */ + Looper* looper; + /* I/O descriptor for asynchronous I/O on the socket. */ + LoopIo io[1]; + /* Timer to use for reconnection attempts. */ + LoopTimer reconnect_timer[1]; + /* Head of the list of the active readers. */ + AsyncSocketIO* readers_head; + /* Tail of the list of the active readers. */ + AsyncSocketIO* readers_tail; + /* Head of the list of the active writers. */ + AsyncSocketIO* writers_head; + /* Tail of the list of the active writers. */ + AsyncSocketIO* writers_tail; + /* Socket's file descriptor. */ + int fd; + /* Timeout to use for reconnection attempts. */ + int reconnect_to; + /* Number of outstanding references to the socket. */ + int ref_count; + /* Flags whether (1) or not (0) socket owns the looper. */ + int owns_looper; +}; + +static const char* +_async_socket_string(AsyncSocket* as) +{ + return sock_address_to_string(&as->address); +} + +static Looper* +_async_socket_get_looper(AsyncSocket* as) +{ + return as->looper; +} + +/* Pulls first reader out of the list. + * Param: + * as - Initialized AsyncSocket instance. + * Return: + * First I/O pulled out of the list, or NULL if there are no I/O in the list. + * Note that the caller is responsible for releasing the I/O object returned + * from this routine. + */ +static AsyncSocketIO* +_async_socket_pull_first_io(AsyncSocket* as, + AsyncSocketIO** list_head, + AsyncSocketIO** list_tail) +{ + AsyncSocketIO* const ret = *list_head; + if (ret != NULL) { + *list_head = ret->next; + ret->next = NULL; + if (*list_head == NULL) { + *list_tail = NULL; + } + } + return ret; +} + +/* Pulls first reader out of the list. + * Param: + * as - Initialized AsyncSocket instance. + * Return: + * First reader pulled out of the list, or NULL if there are no readers in the + * list. + * Note that the caller is responsible for releasing the I/O object returned + * from this routine. + */ +static AsyncSocketIO* +_async_socket_pull_first_reader(AsyncSocket* as) +{ + return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail); +} + +/* Pulls first writer out of the list. + * Param: + * as - Initialized AsyncSocket instance. + * Return: + * First writer pulled out of the list, or NULL if there are no writers in the + * list. + * Note that the caller is responsible for releasing the I/O object returned + * from this routine. + */ +static AsyncSocketIO* +_async_socket_pull_first_writer(AsyncSocket* as) +{ + return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail); +} + +/* Removes an I/O descriptor from a list of active I/O. + * Param: + * as - Initialized AsyncSocket instance. + * list_head, list_tail - Pointers to the list head and tail. + * io - I/O to remove. + * Return: + * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list. + */ +static int +_async_socket_remove_io(AsyncSocket* as, + AsyncSocketIO** list_head, + AsyncSocketIO** list_tail, + AsyncSocketIO* io) +{ + AsyncSocketIO* prev = NULL; + + while (*list_head != NULL && io != *list_head) { + prev = *list_head; + list_head = &((*list_head)->next); + } + if (*list_head == NULL) { + D("%s: I/O %p is not found in the list for socket '%s'", + __FUNCTION__, io, _async_socket_string(as)); + return 0; + } + + *list_head = io->next; + if (prev != NULL) { + prev->next = io->next; + } + if (*list_tail == io) { + *list_tail = prev; + } + + /* Release I/O adjusting reference added when I/O has been saved in the list. */ + async_socket_io_release(io); + + return 1; +} + +/* Advances to the next I/O in the list. + * Param: + * as - Initialized AsyncSocket instance. + * list_head, list_tail - Pointers to the list head and tail. + */ +static void +_async_socket_advance_io(AsyncSocket* as, + AsyncSocketIO** list_head, + AsyncSocketIO** list_tail) +{ + AsyncSocketIO* first_io = *list_head; + if (first_io != NULL) { + *list_head = first_io->next; + first_io->next = NULL; + } + if (*list_head == NULL) { + *list_tail = NULL; + } + if (first_io != NULL) { + /* Release I/O removed from the head of the list. */ + async_socket_io_release(first_io); + } +} + +/* Advances to the next reader in the list. + * Param: + * as - Initialized AsyncSocket instance. + */ +static void +_async_socket_advance_reader(AsyncSocket* as) +{ + _async_socket_advance_io(as, &as->readers_head, &as->readers_tail); +} + +/* Advances to the next writer in the list. + * Param: + * as - Initialized AsyncSocket instance. + */ +static void +_async_socket_advance_writer(AsyncSocket* as) +{ + _async_socket_advance_io(as, &as->writers_head, &as->writers_tail); +} + +/* Completes an I/O. + * Param: + * as - Initialized AsyncSocket instance. + * asio - I/O to complete. + * Return: + * One of AsyncIOAction values. + */ +static AsyncIOAction +_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio) +{ + T("ASocket %s: %s I/O %p is completed.", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); + + /* Stop the timer. */ + async_socket_io_cancel_time_out(asio); + + return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED); +} + +/* Timeouts an I/O. + * Param: + * as - Initialized AsyncSocket instance. + * asio - An I/O that has timed out. + * Return: + * One of AsyncIOAction values. + */ +static AsyncIOAction +_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio) +{ + T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio, + asio->deadline, async_socket_deadline(as, 0)); + + /* Report to the client. */ + const AsyncIOAction action = asio->on_io(asio->io_opaque, asio, + ASIO_STATE_TIMED_OUT); + + /* Remove the I/O from a list of active I/O for actions other than retry. */ + if (action != ASIO_ACTION_RETRY) { + if (asio->is_io_read) { + _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio); + } else { + _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio); + } + } + + return action; +} + +/* Cancels an I/O. + * Param: + * as - Initialized AsyncSocket instance. + * asio - An I/O to cancel. + * Return: + * One of AsyncIOAction values. + */ +static AsyncIOAction +_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio) +{ + T("ASocket %s: %s I/O %p is cancelled.", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); + + /* Stop the timer. */ + async_socket_io_cancel_time_out(asio); + + return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED); +} + +/* Reports an I/O failure. + * Param: + * as - Initialized AsyncSocket instance. + * asio - An I/O that has failed. Can be NULL for general failures. + * failure - Failure (errno) that has occurred. + * Return: + * One of AsyncIOAction values. + */ +static AsyncIOAction +_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure) +{ + T("ASocket %s: %s I/O %p has failed: %d -> %s", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio, + failure, strerror(failure)); + + /* Stop the timer. */ + async_socket_io_cancel_time_out(asio); + + errno = failure; + return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED); +} + +/* Cancels all the active socket readers. + * Param: + * as - Initialized AsyncSocket instance. + */ +static void +_async_socket_cancel_readers(AsyncSocket* as) +{ + while (as->readers_head != NULL) { + AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as); + /* We ignore action returned from the cancellation callback, since we're + * in a disconnected state here. */ + _async_socket_cancel_io(as, to_cancel); + async_socket_io_release(to_cancel); + } +} + +/* Cancels all the active socket writers. + * Param: + * as - Initialized AsyncSocket instance. + */ +static void +_async_socket_cancel_writers(AsyncSocket* as) +{ + while (as->writers_head != NULL) { + AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as); + /* We ignore action returned from the cancellation callback, since we're + * in a disconnected state here. */ + _async_socket_cancel_io(as, to_cancel); + async_socket_io_release(to_cancel); + } +} + +/* Cancels all the I/O on the socket. */ +static void +_async_socket_cancel_all_io(AsyncSocket* as) +{ + /* Stop the reconnection timer. */ + loopTimer_stop(as->reconnect_timer); + + /* Stop read / write on the socket. */ + loopIo_dontWantWrite(as->io); + loopIo_dontWantRead(as->io); + + /* Cancel active readers and writers. */ + _async_socket_cancel_readers(as); + _async_socket_cancel_writers(as); +} + +/* Closes socket handle used by the async socket. + * Param: + * as - Initialized AsyncSocket instance. + */ +static void +_async_socket_close_socket(AsyncSocket* as) +{ + if (as->fd >= 0) { + T("ASocket %s: Socket handle %d is closed.", + _async_socket_string(as), as->fd); + loopIo_done(as->io); + socket_close(as->fd); + as->fd = -1; + } +} + +/* Destroys AsyncSocket instance. + * Param: + * as - Initialized AsyncSocket instance. + */ +static void +_async_socket_free(AsyncSocket* as) +{ + if (as != NULL) { + T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as)); + + /* Close socket. */ + _async_socket_close_socket(as); + + /* Free allocated resources. */ + if (as->looper != NULL) { + loopTimer_done(as->reconnect_timer); + if (as->owns_looper) { + looper_free(as->looper); + } + } + sock_address_done(&as->address); + AFREE(as); + } +} + +/* Starts reconnection attempts after connection has been lost. + * Param: + * as - Initialized AsyncSocket instance. + * to - Milliseconds to wait before reconnection attempt. + */ +static void +_async_socket_reconnect(AsyncSocket* as, int to) +{ + T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to); + + /* Make sure that no I/O is active, and socket is closed before we + * reconnect. */ + _async_socket_cancel_all_io(as); + + /* Set the timer for reconnection attempt. */ + loopTimer_startRelative(as->reconnect_timer, to); +} + +/******************************************************************************** + * Asynchronous Socket callbacks + *******************************************************************************/ + +/* A callback that is invoked when socket gets disconnected. + * Param: + * as - Initialized AsyncSocket instance. + */ +static void +_on_async_socket_disconnected(AsyncSocket* as) +{ + /* Save error to restore it for the client's callback. */ + const int save_errno = errno; + AsyncIOAction action = ASIO_ACTION_ABORT; + + D("ASocket %s: Disconnected.", _async_socket_string(as)); + + /* Cancel all the I/O on this socket. */ + _async_socket_cancel_all_io(as); + + /* Close the socket. */ + _async_socket_close_socket(as); + + /* Restore errno, and invoke client's callback. */ + errno = save_errno; + action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED); + + if (action == ASIO_ACTION_RETRY) { + /* Client requested reconnection. */ + _async_socket_reconnect(as, as->reconnect_to); + } +} + +/* A callback that is invoked on socket's I/O failure. + * Param: + * as - Initialized AsyncSocket instance. + * asio - Descriptor for the failed I/O. Can be NULL for general failures. + */ +static AsyncIOAction +_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio) +{ + D("ASocket %s: %s I/O failure: %d -> %s", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", + errno, strerror(errno)); + + /* Report the failure. */ + return _async_socket_io_failure(as, asio, errno); +} + +/* A callback that is invoked when there is data available to read. + * Param: + * as - Initialized AsyncSocket instance. + * Return: + * 0 on success, or -1 on failure. Failure returned from this routine will + * skip writes (if awailable) behind this read. + */ +static int +_on_async_socket_recv(AsyncSocket* as) +{ + AsyncIOAction action; + + /* Get current reader. */ + AsyncSocketIO* const asr = as->readers_head; + if (asr == NULL) { + D("ASocket %s: No reader is available.", _async_socket_string(as)); + loopIo_dontWantRead(as->io); + return 0; + } + + /* Reference the reader while we're working with it in this callback. */ + async_socket_io_reference(asr); + + /* Bump I/O state, and inform the client that I/O is in progress. */ + if (asr->state == ASIO_STATE_QUEUED) { + asr->state = ASIO_STATE_STARTED; + } else { + asr->state = ASIO_STATE_CONTINUES; + } + action = asr->on_io(asr->io_opaque, asr, asr->state); + if (action == ASIO_ACTION_ABORT) { + D("ASocket %s: Read is aborted by the client.", _async_socket_string(as)); + /* Move on to the next reader. */ + _async_socket_advance_reader(as); + /* Lets see if there are still active readers, and enable, or disable + * read I/O callback accordingly. */ + if (as->readers_head != NULL) { + loopIo_wantRead(as->io); + } else { + loopIo_dontWantRead(as->io); + } + async_socket_io_release(asr); + return 0; + } + + /* Read next chunk of data. */ + int res = socket_recv(as->fd, asr->buffer + asr->transferred, + asr->to_transfer - asr->transferred); + while (res < 0 && errno == EINTR) { + res = socket_recv(as->fd, asr->buffer + asr->transferred, + asr->to_transfer - asr->transferred); + } + + if (res == 0) { + /* Socket has been disconnected. */ + errno = ECONNRESET; + _on_async_socket_disconnected(as); + async_socket_io_release(asr); + return -1; + } + + if (res < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + /* Yield to writes behind this read. */ + loopIo_wantRead(as->io); + async_socket_io_release(asr); + return 0; + } + + /* An I/O error. */ + action = _on_async_socket_failure(as, asr); + if (action != ASIO_ACTION_RETRY) { + D("ASocket %s: Read is aborted on failure.", _async_socket_string(as)); + /* Move on to the next reader. */ + _async_socket_advance_reader(as); + /* Lets see if there are still active readers, and enable, or disable + * read I/O callback accordingly. */ + if (as->readers_head != NULL) { + loopIo_wantRead(as->io); + } else { + loopIo_dontWantRead(as->io); + } + } + async_socket_io_release(asr); + return -1; + } + + /* Update the reader's descriptor. */ + asr->transferred += res; + if (asr->transferred == asr->to_transfer) { + /* This read is completed. Move on to the next reader. */ + _async_socket_advance_reader(as); + + /* Notify reader completion. */ + _async_socket_complete_io(as, asr); + } + + /* Lets see if there are still active readers, and enable, or disable read + * I/O callback accordingly. */ + if (as->readers_head != NULL) { + loopIo_wantRead(as->io); + } else { + loopIo_dontWantRead(as->io); + } + + async_socket_io_release(asr); + + return 0; +} + +/* A callback that is invoked when there is data available to write. + * Param: + * as - Initialized AsyncSocket instance. + * Return: + * 0 on success, or -1 on failure. Failure returned from this routine will + * skip reads (if awailable) behind this write. + */ +static int +_on_async_socket_send(AsyncSocket* as) +{ + AsyncIOAction action; + + /* Get current writer. */ + AsyncSocketIO* const asw = as->writers_head; + if (asw == NULL) { + D("ASocket %s: No writer is available.", _async_socket_string(as)); + loopIo_dontWantWrite(as->io); + return 0; + } + + /* Reference the writer while we're working with it in this callback. */ + async_socket_io_reference(asw); + + /* Bump I/O state, and inform the client that I/O is in progress. */ + if (asw->state == ASIO_STATE_QUEUED) { + asw->state = ASIO_STATE_STARTED; + } else { + asw->state = ASIO_STATE_CONTINUES; + } + action = asw->on_io(asw->io_opaque, asw, asw->state); + if (action == ASIO_ACTION_ABORT) { + D("ASocket %s: Write is aborted by the client.", _async_socket_string(as)); + /* Move on to the next writer. */ + _async_socket_advance_writer(as); + /* Lets see if there are still active writers, and enable, or disable + * write I/O callback accordingly. */ + if (as->writers_head != NULL) { + loopIo_wantWrite(as->io); + } else { + loopIo_dontWantWrite(as->io); + } + async_socket_io_release(asw); + return 0; + } + + /* Write next chunk of data. */ + int res = socket_send(as->fd, asw->buffer + asw->transferred, + asw->to_transfer - asw->transferred); + while (res < 0 && errno == EINTR) { + res = socket_send(as->fd, asw->buffer + asw->transferred, + asw->to_transfer - asw->transferred); + } + + if (res == 0) { + /* Socket has been disconnected. */ + errno = ECONNRESET; + _on_async_socket_disconnected(as); + async_socket_io_release(asw); + return -1; + } + + if (res < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + /* Yield to reads behind this write. */ + loopIo_wantWrite(as->io); + async_socket_io_release(asw); + return 0; + } + + /* An I/O error. */ + action = _on_async_socket_failure(as, asw); + if (action != ASIO_ACTION_RETRY) { + D("ASocket %s: Write is aborted on failure.", _async_socket_string(as)); + /* Move on to the next writer. */ + _async_socket_advance_writer(as); + /* Lets see if there are still active writers, and enable, or disable + * write I/O callback accordingly. */ + if (as->writers_head != NULL) { + loopIo_wantWrite(as->io); + } else { + loopIo_dontWantWrite(as->io); + } + } + async_socket_io_release(asw); + return -1; + } + + /* Update the writer descriptor. */ + asw->transferred += res; + if (asw->transferred == asw->to_transfer) { + /* This write is completed. Move on to the next writer. */ + _async_socket_advance_writer(as); + + /* Notify writer completion. */ + _async_socket_complete_io(as, asw); + } + + /* Lets see if there are still active writers, and enable, or disable write + * I/O callback accordingly. */ + if (as->writers_head != NULL) { + loopIo_wantWrite(as->io); + } else { + loopIo_dontWantWrite(as->io); + } + + async_socket_io_release(asw); + + return 0; +} + +/* A callback that is invoked when an I/O is available on socket. + * Param: + * as - Initialized AsyncSocket instance. + * fd - Socket's file descriptor. + * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask. + */ +static void +_on_async_socket_io(void* opaque, int fd, unsigned events) +{ + AsyncSocket* const as = (AsyncSocket*)opaque; + + /* Reference the socket while we're working with it in this callback. */ + async_socket_reference(as); + + if ((events & LOOP_IO_READ) != 0) { + if (_on_async_socket_recv(as) != 0) { + async_socket_release(as); + return; + } + } + + if ((events & LOOP_IO_WRITE) != 0) { + if (_on_async_socket_send(as) != 0) { + async_socket_release(as); + return; + } + } + + async_socket_release(as); +} + +/* A callback that is invoked by asynchronous socket connector on connection + * events. + * Param: + * opaque - Initialized AsyncSocket instance. + * connector - Connector that is used to connect this socket. + * event - Connection event. + * Return: + * One of AsyncIOAction values. + */ +static AsyncIOAction +_on_connector_events(void* opaque, + AsyncSocketConnector* connector, + AsyncIOState event) +{ + AsyncIOAction action; + AsyncSocket* const as = (AsyncSocket*)opaque; + + /* Reference the socket while we're working with it in this callback. */ + async_socket_reference(as); + + if (event == ASIO_STATE_SUCCEEDED) { + /* Accept the connection. */ + as->fd = async_socket_connector_pull_fd(connector); + loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as); + } + + /* Invoke client's callback. */ + action = as->on_connection(as->client_opaque, as, event); + if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) { + /* For whatever reason the client didn't want to keep this connection. + * Close it. */ + D("ASocket %s: Connection is discarded by the client.", + _async_socket_string(as)); + _async_socket_close_socket(as); + } + + if (action != ASIO_ACTION_RETRY) { + async_socket_connector_release(connector); + } + + async_socket_release(as); + + return action; +} + +/* Timer callback invoked to reconnect the lost connection. + * Param: + * as - Initialized AsyncSocket instance. + */ +void +_on_async_socket_reconnect(void* opaque) +{ + AsyncSocket* as = (AsyncSocket*)opaque; + + /* Reference the socket while we're working with it in this callback. */ + async_socket_reference(as); + async_socket_connect(as, as->reconnect_to); + async_socket_release(as); +} + + +/******************************************************************************** + * Android Device Socket public API + *******************************************************************************/ + +AsyncSocket* +async_socket_new(int port, + int reconnect_to, + on_as_connection_cb client_cb, + void* client_opaque, + Looper* looper) +{ + AsyncSocket* as; + + if (client_cb == NULL) { + E("Invalid client_cb parameter"); + return NULL; + } + + ANEW0(as); + + as->fd = -1; + as->client_opaque = client_opaque; + as->on_connection = client_cb; + as->readers_head = as->readers_tail = NULL; + as->reconnect_to = reconnect_to; + as->ref_count = 1; + sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port); + if (looper == NULL) { + as->looper = looper_newCore(); + if (as->looper == NULL) { + E("Unable to create I/O looper for async socket '%s'", + _async_socket_string(as)); + client_cb(client_opaque, as, ASIO_STATE_FAILED); + _async_socket_free(as); + return NULL; + } + as->owns_looper = 1; + } else { + as->looper = looper; + as->owns_looper = 0; + } + + loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as); + + T("ASocket %s: Descriptor is created.", _async_socket_string(as)); + + return as; +} + +int +async_socket_reference(AsyncSocket* as) +{ + assert(as->ref_count > 0); + as->ref_count++; + return as->ref_count; +} + +int +async_socket_release(AsyncSocket* as) +{ + assert(as->ref_count > 0); + as->ref_count--; + if (as->ref_count == 0) { + /* Last reference has been dropped. Destroy this object. */ + _async_socket_cancel_all_io(as); + _async_socket_free(as); + return 0; + } + return as->ref_count; +} + +void +async_socket_connect(AsyncSocket* as, int retry_to) +{ + T("ASocket %s: Handling connection request for %dms...", + _async_socket_string(as), retry_to); + + AsyncSocketConnector* const connector = + async_socket_connector_new(&as->address, retry_to, _on_connector_events, + as, as->looper); + if (connector != NULL) { + async_socket_connector_connect(connector); + } else { + as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED); + } +} + +void +async_socket_disconnect(AsyncSocket* as) +{ + T("ASocket %s: Handling disconnection request...", _async_socket_string(as)); + + if (as != NULL) { + _async_socket_cancel_all_io(as); + _async_socket_close_socket(as); + } +} + +void +async_socket_reconnect(AsyncSocket* as, int retry_to) +{ + T("ASocket %s: Handling reconnection request for %dms...", + _async_socket_string(as), retry_to); + + _async_socket_cancel_all_io(as); + _async_socket_close_socket(as); + _async_socket_reconnect(as, retry_to); +} + +void +async_socket_read_abs(AsyncSocket* as, + void* buffer, uint32_t len, + on_as_io_cb reader_cb, + void* reader_opaque, + Duration deadline) +{ + T("ASocket %s: Handling read for %d bytes with deadline %lld...", + _async_socket_string(as), len, deadline); + + AsyncSocketIO* const asr = + _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque, + deadline); + if (async_socket_is_connected(as)) { + /* Add new reader to the list. Note that we use initial reference from I/O + * 'new' routine as "in the list" reference counter. */ + if (as->readers_head == NULL) { + as->readers_head = as->readers_tail = asr; + } else { + as->readers_tail->next = asr; + as->readers_tail = asr; + } + loopIo_wantRead(as->io); + } else { + D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as)); + errno = ECONNRESET; + reader_cb(reader_opaque, asr, ASIO_STATE_FAILED); + async_socket_io_release(asr); + } +} + +void +async_socket_read_rel(AsyncSocket* as, + void* buffer, uint32_t len, + on_as_io_cb reader_cb, + void* reader_opaque, + int to) +{ + const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to : + DURATION_INFINITE; + async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl); +} + +void +async_socket_write_abs(AsyncSocket* as, + const void* buffer, uint32_t len, + on_as_io_cb writer_cb, + void* writer_opaque, + Duration deadline) +{ + T("ASocket %s: Handling write for %d bytes with deadline %lld...", + _async_socket_string(as), len, deadline); + + AsyncSocketIO* const asw = + _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque, + deadline); + if (async_socket_is_connected(as)) { + /* Add new writer to the list. Note that we use initial reference from I/O + * 'new' routine as "in the list" reference counter. */ + if (as->writers_head == NULL) { + as->writers_head = as->writers_tail = asw; + } else { + as->writers_tail->next = asw; + as->writers_tail = asw; + } + loopIo_wantWrite(as->io); + } else { + D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as)); + errno = ECONNRESET; + writer_cb(writer_opaque, asw, ASIO_STATE_FAILED); + async_socket_io_release(asw); + } +} + +void +async_socket_write_rel(AsyncSocket* as, + const void* buffer, uint32_t len, + on_as_io_cb writer_cb, + void* writer_opaque, + int to) +{ + const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to : + DURATION_INFINITE; + async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl); +} + +void* +async_socket_get_client_opaque(const AsyncSocket* as) +{ + return as->client_opaque; +} + +Duration +async_socket_deadline(AsyncSocket* as, int rel) +{ + return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel : + DURATION_INFINITE; +} + +int +async_socket_get_port(const AsyncSocket* as) +{ + return sock_address_get_port(&as->address); +} + +int +async_socket_is_connected(const AsyncSocket* as) +{ + return as->fd >= 0; +} |