aboutsummaryrefslogtreecommitdiffstats
path: root/android/async-socket.c
diff options
context:
space:
mode:
authorVladimir Chtchetkine <vchtchetkine@google.com>2012-04-02 07:48:19 -0700
committerVladimir Chtchetkine <vchtchetkine@google.com>2012-04-02 07:48:19 -0700
commit6dc5c2cef91004488f04fc6e9c0946f6d3a29705 (patch)
tree41fdf1c93fef544aea1a08a5066dbb3f26df6351 /android/async-socket.c
parenta7383ef4eb8d3863c8d582ea0d6b2ddb42125cba (diff)
downloadexternal_qemu-6dc5c2cef91004488f04fc6e9c0946f6d3a29705.zip
external_qemu-6dc5c2cef91004488f04fc6e9c0946f6d3a29705.tar.gz
external_qemu-6dc5c2cef91004488f04fc6e9c0946f6d3a29705.tar.bz2
Refactor asynchronous socket APIs
The initial implementation was a bit too complex in two ways: 1. Each component (the connector, and async socket) had its own set of state and action enums, which was confusing, required value translation, and was not really needed. So, all these enums have been combined into two common enums that are now declared in android/async-io-common.h 2. Too many callbacks, which really complicated implementation of the clients. It is much more efficient to have just two callbacks (one to monitor connection, and another to monitor I/O), letting the client to dispatch on particular event (success/timeout/etc.) This CL fixes these two issues. Change-Id: I545c93dee2e9e9c72c1d25e6cd218c8680933ee3
Diffstat (limited to 'android/async-socket.c')
-rw-r--r--android/async-socket.c402
1 files changed, 261 insertions, 141 deletions
diff --git a/android/async-socket.c b/android/async-socket.c
index 13e15ca..4be69f6 100644
--- a/android/async-socket.c
+++ b/android/async-socket.c
@@ -37,9 +37,6 @@
* Asynchronous Socket internal API declarations
*******************************************************************************/
-/* Asynchronous socket I/O (reader, or writer) descriptor. */
-typedef struct AsyncSocketIO AsyncSocketIO;
-
/* Gets socket's address string. */
static const char* _async_socket_string(AsyncSocket* as);
@@ -48,10 +45,11 @@ static Looper* _async_socket_get_looper(AsyncSocket* as);
/* Handler for the I/O time out.
* Param:
- * as - Asynchronous socket for the reader.
+ * as - Asynchronous socket for the I/O.
* asio - Desciptor for the timed out I/O.
*/
-static void _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio);
+static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
+ AsyncSocketIO* asio);
/********************************************************************************
* Asynchronous Socket Reader / Writer
@@ -72,10 +70,12 @@ struct AsyncSocketIO {
uint32_t to_transfer;
/* Bytes thransferred through the socket in this I/O. */
uint32_t transferred;
- /* I/O callbacks for this I/O. */
- const ASIOCb* io_cb;
+ /* I/O callback for this I/O. */
+ on_as_io_cb on_io;
/* I/O type selector: 1 - read, 0 - write. */
int is_io_read;
+ /* State of the I/O. */
+ AsyncIOState state;
};
/*
@@ -105,7 +105,7 @@ static void _on_async_socket_io_timed_out(void* opaque);
* as - Asynchronous socket for the I/O.
* is_io_read - I/O type selector: 1 - read, 0 - write.
* buffer, len - Reader / writer buffer address.
- * io_cb - Callbacks for this reader / writer.
+ * io_cb - Callback for this reader / writer.
* io_opaque - An opaque pointer associated with the I/O.
* deadline - Deadline to complete the I/O.
* Return:
@@ -116,7 +116,7 @@ _async_socket_rw_new(AsyncSocket* as,
int is_io_read,
void* buffer,
uint32_t len,
- const ASIOCb* io_cb,
+ on_as_io_cb io_cb,
void* io_opaque,
Duration deadline)
{
@@ -137,8 +137,9 @@ _async_socket_rw_new(AsyncSocket* as,
asio->buffer = (uint8_t*)buffer;
asio->to_transfer = len;
asio->transferred = 0;
- asio->io_cb = io_cb;
+ asio->on_io = io_cb;
asio->io_opaque = io_opaque;
+ asio->state = ASIO_STATE_QUEUED;
loopTimer_init(asio->timer, _async_socket_get_looper(as),
_on_async_socket_io_timed_out, asio);
@@ -168,7 +169,7 @@ _async_socket_io_destroy(AsyncSocketIO* asio)
* Param:
* as - Asynchronous socket for the reader.
* buffer, len - Reader's buffer.
- * io_cb - Lists reader's callbacks.
+ * io_cb - Reader's callback.
* reader_opaque - An opaque pointer associated with the reader.
* deadline - Deadline to complete the operation.
* Return:
@@ -178,7 +179,7 @@ static AsyncSocketIO*
_async_socket_reader_new(AsyncSocket* as,
void* buffer,
uint32_t len,
- const ASIOCb* io_cb,
+ on_as_io_cb io_cb,
void* reader_opaque,
Duration deadline)
{
@@ -191,7 +192,7 @@ _async_socket_reader_new(AsyncSocket* as,
* Param:
* as - Asynchronous socket for the writer.
* buffer, len - Writer's buffer.
- * io_cb - Lists writer's callbacks.
+ * io_cb - Writer's callback.
* writer_opaque - An opaque pointer associated with the writer.
* deadline - Deadline to complete the operation.
* Return:
@@ -201,7 +202,7 @@ static AsyncSocketIO*
_async_socket_writer_new(AsyncSocket* as,
const void* buffer,
uint32_t len,
- const ASIOCb* io_cb,
+ on_as_io_cb io_cb,
void* writer_opaque,
Duration deadline)
{
@@ -216,8 +217,76 @@ static void
_on_async_socket_io_timed_out(void* opaque)
{
AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
- _async_socket_io_timed_out(asio->as, asio);
- _async_socket_io_destroy(asio);
+ const AsyncIOAction action = _async_socket_io_timed_out(asio->as, asio);
+ if (action != ASIO_ACTION_RETRY) {
+ _async_socket_io_destroy(asio);
+ }
+}
+
+/********************************************************************************
+ * Public Asynchronous Socket I/O API
+ *******************************************************************************/
+
+AsyncSocket*
+async_socket_io_get_socket(const AsyncSocketIO* asio)
+{
+ return asio->as;
+}
+
+void
+async_socket_io_cancel_time_out(AsyncSocketIO* asio)
+{
+ loopTimer_stop(asio->timer);
+}
+
+void*
+async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
+{
+ return asio->io_opaque;
+}
+
+void*
+async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
+{
+ return async_socket_get_client_opaque(asio->as);
+}
+
+void*
+async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
+ uint32_t* transferred,
+ uint32_t* to_transfer)
+{
+ if (transferred != NULL) {
+ *transferred = asio->transferred;
+ }
+ if (to_transfer != NULL) {
+ *to_transfer = asio->to_transfer;
+ }
+ return asio->buffer;
+}
+
+void*
+async_socket_io_get_buffer(const AsyncSocketIO* asio)
+{
+ return asio->buffer;
+}
+
+uint32_t
+async_socket_io_get_transferred(const AsyncSocketIO* asio)
+{
+ return asio->transferred;
+}
+
+uint32_t
+async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
+{
+ return asio->to_transfer;
+}
+
+int
+async_socket_io_is_read(const AsyncSocketIO* asio)
+{
+ return asio->is_io_read;
}
/********************************************************************************
@@ -227,8 +296,8 @@ _on_async_socket_io_timed_out(void* opaque)
struct AsyncSocket {
/* TCP address for the socket. */
SockAddress address;
- /* Client callbacks for this socket. */
- const ASClientCb* client_cb;
+ /* Connection callback for this socket. */
+ on_as_connection_cb on_connection;
/* An opaque pointer associated with this socket by the client. */
void* client_opaque;
/* I/O looper for asynchronous I/O on the socket. */
@@ -353,7 +422,7 @@ _async_socket_remove_io(AsyncSocket* as,
* as - Initialized AsyncSocket instance.
* list_head, list_tail - Pointers to the list head and tail.
* Return:
- * Next I/O at the head of the list, or NULL if I/O list become empty.
+ * Next I/O at the head of the list, or NULL if I/O list became empty.
*/
static AsyncSocketIO*
_async_socket_advance_io(AsyncSocket* as,
@@ -375,7 +444,7 @@ _async_socket_advance_io(AsyncSocket* as,
* Param:
* as - Initialized AsyncSocket instance.
* Return:
- * Next reader at the head of the list, or NULL if reader list become empty.
+ * Next reader at the head of the list, or NULL if reader list became empty.
*/
static AsyncSocketIO*
_async_socket_advance_reader(AsyncSocket* as)
@@ -387,7 +456,7 @@ _async_socket_advance_reader(AsyncSocket* as)
* Param:
* as - Initialized AsyncSocket instance.
* Return:
- * Next writer at the head of the list, or NULL if writer list become empty.
+ * Next writer at the head of the list, or NULL if writer list became empty.
*/
static AsyncSocketIO*
_async_socket_advance_writer(AsyncSocket* as)
@@ -399,75 +468,58 @@ _async_socket_advance_writer(AsyncSocket* as)
* Param:
* as - Initialized AsyncSocket instance.
* asio - I/O to complete.
+ * Return:
+ * One of AsyncIOAction values.
*/
-static void
+static AsyncIOAction
_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
{
/* Stop the timer. */
loopTimer_stop(asio->timer);
- /* Report I/O completion. First report via I/O callback, and only if it is
- * not set, report via client callback. */
- if (asio->io_cb && asio->io_cb->on_completed) {
- asio->io_cb->on_completed(as->client_opaque, as, asio->is_io_read,
- asio->io_opaque, asio->buffer, asio->transferred);
- } else if (as->client_cb->io_cb && as->client_cb->io_cb->on_completed) {
- as->client_cb->io_cb->on_completed(as->client_opaque, as, asio->is_io_read,
- asio->io_opaque, asio->buffer,
- asio->transferred);
- }
+ return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
}
/* Timeouts an I/O.
* Param:
* as - Initialized AsyncSocket instance.
* asio - An I/O that has timed out.
+ * Return:
+ * One of AsyncIOAction values.
*/
-static void
+static AsyncIOAction
_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
{
- /* Remove the I/O from a list of active I/O. */
- if (asio->is_io_read) {
- _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
- } else {
- _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
+ /* Report to the client. */
+ const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
+ ASIO_STATE_TIMED_OUT);
+
+ /* Remove the I/O from a list of active I/O for actions other than retry. */
+ if (action != ASIO_ACTION_RETRY) {
+ if (asio->is_io_read) {
+ _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
+ } else {
+ _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
+ }
}
- /* Report I/O time out. First report it via I/O callbacks, and only if it is
- * not set, report it via client callbacks. */
- if (asio->io_cb && asio->io_cb->on_timed_out) {
- asio->io_cb->on_timed_out(as->client_opaque, as, asio->is_io_read,
- asio->io_opaque, asio->buffer,
- asio->transferred, asio->to_transfer);
- } else if (as->client_cb->io_cb && as->client_cb->io_cb->on_timed_out) {
- as->client_cb->io_cb->on_timed_out(as->client_opaque, as, asio->is_io_read,
- asio->io_opaque, asio->buffer,
- asio->transferred, asio->to_transfer);
- }
+ return action;
}
/* Cancels an I/O.
* Param:
* as - Initialized AsyncSocket instance.
* asio - An I/O to cancel.
+ * Return:
+ * One of AsyncIOAction values.
*/
-static void
+static AsyncIOAction
_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
{
/* Stop the timer. */
loopTimer_stop(asio->timer);
- /* Report I/O cancellation. First report it via I/O callbacks, and only if it
- * is not set, report it via client callbacks. */
- if (asio->io_cb && asio->io_cb->on_cancelled) {
- asio->io_cb->on_cancelled(as->client_opaque, as, asio->is_io_read,
- asio->io_opaque, asio->buffer,
- asio->transferred, asio->to_transfer);
- } else if (as->client_cb->io_cb && as->client_cb->io_cb->on_cancelled) {
- as->client_cb->io_cb->on_cancelled(as->client_opaque, as, asio->is_io_read,
- asio->io_opaque, asio->buffer,
- asio->transferred, asio->to_transfer);
- }
+ return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
}
/* Reports an I/O failure.
@@ -475,25 +527,17 @@ _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
* as - Initialized AsyncSocket instance.
* asio - An I/O that has failed. Can be NULL for general failures.
* failure - Failure (errno) that has occurred.
+ * Return:
+ * One of AsyncIOAction values.
*/
-static void
+static AsyncIOAction
_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
{
/* Stop the timer. */
loopTimer_stop(asio->timer);
- /* Report I/O failure. First report it via I/O callbacks, and only if it
- * is not set, report it via client callbacks. */
- if (asio && asio->io_cb && asio->io_cb->on_io_failure) {
- asio->io_cb->on_io_failure(as->client_opaque, as, asio->is_io_read,
- asio->io_opaque, asio->buffer,
- asio->transferred, asio->to_transfer, failure);
- } else if (as->client_cb->io_cb && as->client_cb->io_cb->on_io_failure) {
- as->client_cb->io_cb->on_io_failure(as->client_opaque, as,
- asio->is_io_read, asio->io_opaque,
- asio->buffer, asio->transferred,
- asio->to_transfer, failure);
- }
+ errno = failure;
+ return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
}
/* Cancels all the active socket readers.
@@ -505,6 +549,8 @@ _async_socket_cancel_readers(AsyncSocket* as)
{
while (as->readers_head != NULL) {
AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
+ /* We ignore action returned from the cancellation callback, since we're
+ * in a disconnected state here. */
_async_socket_cancel_io(as, to_cancel);
_async_socket_io_destroy(to_cancel);
}
@@ -519,6 +565,8 @@ _async_socket_cancel_writers(AsyncSocket* as)
{
while (as->writers_head != NULL) {
AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
+ /* We ignore action returned from the cancellation callback, since we're
+ * in a disconnected state here. */
_async_socket_cancel_io(as, to_cancel);
_async_socket_io_destroy(to_cancel);
}
@@ -607,7 +655,7 @@ _on_async_socket_disconnected(AsyncSocket* as)
{
/* Save error to restore it for the client's callback. */
const int save_errno = errno;
- ASConnectAction action = ASCA_ABORT;
+ AsyncIOAction action = ASIO_ACTION_ABORT;
D("Async socket '%s' is disconnected. Error %d -> %s",
_async_socket_string(as), errno, strerror(errno));
@@ -620,14 +668,11 @@ _on_async_socket_disconnected(AsyncSocket* as)
/* Restore errno, and invoke client's callback. */
errno = save_errno;
- action = as->client_cb->on_connection(as->client_opaque, as,
- ASCS_DISCONNECTED);
+ action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
- if (action == ASCA_RETRY) {
+ if (action == ASIO_ACTION_RETRY) {
/* Client requested reconnection. */
- if (as->reconnect_to) {
- _async_socket_reconnect(as, as->reconnect_to);
- }
+ _async_socket_reconnect(as, as->reconnect_to);
}
}
@@ -636,14 +681,14 @@ _on_async_socket_disconnected(AsyncSocket* as)
* as - Initialized AsyncSocket instance.
* asio - Descriptor for the failed I/O. Can be NULL for general failures.
*/
-static void
+static AsyncIOAction
_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
{
- D("Async socket '%s' I/O failure %d: %s",
+ D("Async socket '%s' I/O failure: %d -> %s",
_async_socket_string(as), errno, strerror(errno));
/* Report the failure. */
- _async_socket_io_failure(as, asio, errno);
+ return _async_socket_io_failure(as, asio, errno);
}
/* A callback that is invoked when there is data available to read.
@@ -656,6 +701,8 @@ _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
static int
_on_async_socket_recv(AsyncSocket* as)
{
+ AsyncIOAction action;
+
/* Get current reader. */
AsyncSocketIO* const asr = as->readers_head;
if (asr == NULL) {
@@ -665,6 +712,29 @@ _on_async_socket_recv(AsyncSocket* as)
return 0;
}
+ /* Bump I/O state, and inform the client that I/O is in progress. */
+ if (asr->state == ASIO_STATE_QUEUED) {
+ asr->state = ASIO_STATE_STARTED;
+ } else {
+ asr->state = ASIO_STATE_CONTINUES;
+ }
+ action = asr->on_io(asr->io_opaque, asr, asr->state);
+ if (action == ASIO_ACTION_ABORT) {
+ D("Read is aborted by the client of async socket '%s'",
+ _async_socket_string(as));
+ /* Move on to the next reader. */
+ _async_socket_advance_reader(as);
+ _async_socket_io_destroy(asr);
+ /* Lets see if there are still active readers, and enable, or disable
+ * read I/O callback accordingly. */
+ if (as->readers_head != NULL) {
+ loopIo_wantRead(as->io);
+ } else {
+ loopIo_dontWantRead(as->io);
+ }
+ return 0;
+ }
+
/* Read next chunk of data. */
int res = socket_recv(as->fd, asr->buffer + asr->transferred,
asr->to_transfer - asr->transferred);
@@ -688,7 +758,21 @@ _on_async_socket_recv(AsyncSocket* as)
}
/* An I/O error. */
- _on_async_socket_failure(as, asr);
+ action = _on_async_socket_failure(as, asr);
+ if (action == ASIO_ACTION_ABORT) {
+ D("Read is aborted on failure by the client of async socket '%s'",
+ _async_socket_string(as));
+ /* Move on to the next reader. */
+ _async_socket_advance_reader(as);
+ _async_socket_io_destroy(asr);
+ /* Lets see if there are still active readers, and enable, or disable
+ * read I/O callback accordingly. */
+ if (as->readers_head != NULL) {
+ loopIo_wantRead(as->io);
+ } else {
+ loopIo_dontWantRead(as->io);
+ }
+ }
return -1;
}
@@ -724,6 +808,8 @@ _on_async_socket_recv(AsyncSocket* as)
static int
_on_async_socket_send(AsyncSocket* as)
{
+ AsyncIOAction action;
+
/* Get current writer. */
AsyncSocketIO* const asw = as->writers_head;
if (asw == NULL) {
@@ -733,6 +819,29 @@ _on_async_socket_send(AsyncSocket* as)
return 0;
}
+ /* Bump I/O state, and inform the client that I/O is in progress. */
+ if (asw->state == ASIO_STATE_QUEUED) {
+ asw->state = ASIO_STATE_STARTED;
+ } else {
+ asw->state = ASIO_STATE_CONTINUES;
+ }
+ action = asw->on_io(asw->io_opaque, asw, asw->state);
+ if (action == ASIO_ACTION_ABORT) {
+ D("Write is aborted by the client of async socket '%s'",
+ _async_socket_string(as));
+ /* Move on to the next reader. */
+ _async_socket_advance_reader(as);
+ _async_socket_io_destroy(asw);
+ /* Lets see if there are still active writers, and enable, or disable
+ * write I/O callback accordingly. */
+ if (as->writers_head != NULL) {
+ loopIo_wantWrite(as->io);
+ } else {
+ loopIo_dontWantWrite(as->io);
+ }
+ return 0;
+ }
+
/* Write next chunk of data. */
int res = socket_send(as->fd, asw->buffer + asw->transferred,
asw->to_transfer - asw->transferred);
@@ -756,7 +865,22 @@ _on_async_socket_send(AsyncSocket* as)
}
/* An I/O error. */
- _on_async_socket_failure(as, asw);
+ /* An I/O error. */
+ action = _on_async_socket_failure(as, asw);
+ if (action == ASIO_ACTION_ABORT) {
+ D("Write is aborted on failure by the client of async socket '%s'",
+ _async_socket_string(as));
+ /* Move on to the next reader. */
+ _async_socket_advance_reader(as);
+ _async_socket_io_destroy(asw);
+ /* Lets see if there are still active writers, and enable, or disable
+ * write I/O callback accordingly. */
+ if (as->writers_head != NULL) {
+ loopIo_wantWrite(as->io);
+ } else {
+ loopIo_dontWantWrite(as->io);
+ }
+ }
return -1;
}
@@ -813,51 +937,33 @@ _on_async_socket_io(void* opaque, int fd, unsigned events)
* connector - Connector that is used to connect this socket.
* event - Connection event.
* Return:
- * One of ASCCbRes values.
+ * One of AsyncIOAction values.
*/
-static ASCCbRes
+static AsyncIOAction
_on_connector_events(void* opaque,
AsyncSocketConnector* connector,
- ASCEvent event)
+ AsyncIOState event)
{
- ASConnectAction action;
- ASConnectStatus adsc_status;
+ AsyncIOAction action;
AsyncSocket* const as = (AsyncSocket*)opaque;
- /* Convert connector event into async socket connection event. */
- switch (event) {
- case ASC_CONNECTION_SUCCEEDED:
- /* Accept the connection. */
- adsc_status = ASCS_CONNECTED;
- as->fd = async_socket_connector_pull_fd(connector);
- loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
- break;
-
- case ASC_CONNECTION_RETRY:
- adsc_status = ASCS_RETRY;
- break;
-
- case ASC_CONNECTION_FAILED:
- default:
- adsc_status = ASCS_FAILURE;
- break;
+ if (event == ASIO_STATE_SUCCEEDED) {
+ /* Accept the connection. */
+ as->fd = async_socket_connector_pull_fd(connector);
+ loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
}
/* Invoke client's callback. */
- action = as->client_cb->on_connection(as->client_opaque, as, adsc_status);
- if (event == ASC_CONNECTION_SUCCEEDED && action != ASCA_KEEP) {
+ action = as->on_connection(as->client_opaque, as, event);
+ if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
/* For whatever reason the client didn't want to keep this connection.
* Close it. */
+ D("Connection is discarded by a client of async socket '%s'",
+ _async_socket_string(as));
_async_socket_close_socket(as);
}
- if (action == ASCA_RETRY) {
- return ASC_CB_RETRY;
- } else if (action == ASCA_ABORT) {
- return ASC_CB_ABORT;
- } else {
- return ASC_CB_KEEP;
- }
+ return action;
}
/* Timer callback invoked to reconnect the lost connection.
@@ -879,12 +985,12 @@ _on_async_socket_reconnect(void* opaque)
AsyncSocket*
async_socket_new(int port,
int reconnect_to,
- const ASClientCb* client_cb,
+ on_as_connection_cb client_cb,
void* client_opaque)
{
AsyncSocket* as;
- if (client_cb == NULL || client_cb->on_connection == NULL) {
+ if (client_cb == NULL) {
E("Invalid client_cb parameter");
return NULL;
}
@@ -893,7 +999,7 @@ async_socket_new(int port,
as->fd = -1;
as->client_opaque = client_opaque;
- as->client_cb = client_cb;
+ as->on_connection = client_cb;
as->readers_head = as->readers_tail = NULL;
as->reconnect_to = reconnect_to;
sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
@@ -901,6 +1007,7 @@ async_socket_new(int port,
if (as->looper == NULL) {
E("Unable to create I/O looper for async socket '%s'",
_async_socket_string(as));
+ client_cb(client_opaque, as, ASIO_STATE_FAILED);
_async_socket_destroy(as);
return NULL;
}
@@ -909,15 +1016,16 @@ async_socket_new(int port,
return as;
}
-int
+void
async_socket_connect(AsyncSocket* as, int retry_to)
{
AsyncSocketConnector* const connector =
async_socket_connector_new(&as->address, retry_to, _on_connector_events, as);
- if (connector == NULL) {
- return -1;
+ if (connector != NULL) {
+ async_socket_connector_connect(connector);
+ } else {
+ as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
}
- return (async_socket_connector_connect(connector) == ASC_CONNECT_FAILED) ? -1 : 0;
}
void
@@ -930,18 +1038,18 @@ async_socket_disconnect(AsyncSocket* as)
}
}
-int
+void
async_socket_reconnect(AsyncSocket* as, int retry_to)
{
_async_socket_cancel_all_io(as);
_async_socket_close_socket(as);
- return async_socket_connect(as, retry_to);
+ async_socket_connect(as, retry_to);
}
-int
+void
async_socket_read_abs(AsyncSocket* as,
void* buffer, uint32_t len,
- const ASIOCb* reader_cb,
+ on_as_io_cb reader_cb,
void* reader_opaque,
Duration deadline)
{
@@ -955,25 +1063,24 @@ async_socket_read_abs(AsyncSocket* as,
as->readers_tail = asr;
}
loopIo_wantRead(as->io);
- return 0;
}
-int
+void
async_socket_read_rel(AsyncSocket* as,
void* buffer, uint32_t len,
- const ASIOCb* reader_cb,
+ on_as_io_cb reader_cb,
void* reader_opaque,
int to)
{
const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
DURATION_INFINITE;
- return async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
+ async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
}
-int
+void
async_socket_write_abs(AsyncSocket* as,
const void* buffer, uint32_t len,
- const ASIOCb* writer_cb,
+ on_as_io_cb writer_cb,
void* writer_opaque,
Duration deadline)
{
@@ -987,16 +1094,29 @@ async_socket_write_abs(AsyncSocket* as,
as->writers_tail = asw;
}
loopIo_wantWrite(as->io);
- return 0;
}
-int async_socket_write_rel(AsyncSocket* as,
- const void* buffer, uint32_t len,
- const ASIOCb* writer_cb,
- void* writer_opaque,
- int to)
+void
+async_socket_write_rel(AsyncSocket* as,
+ const void* buffer, uint32_t len,
+ on_as_io_cb writer_cb,
+ void* writer_opaque,
+ int to)
{
const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
DURATION_INFINITE;
- return async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
+ async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
+}
+
+void*
+async_socket_get_client_opaque(const AsyncSocket* as)
+{
+ return as->client_opaque;
+}
+
+Duration
+async_socket_deadline(AsyncSocket* as, int rel)
+{
+ return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
+ DURATION_INFINITE;
}