From ef4ccd385650612a830a098f4b1eac48482b65b3 Mon Sep 17 00:00:00 2001 From: Vladimir Chtchetkine Date: Tue, 3 Apr 2012 10:27:12 -0700 Subject: Make all async I/O object referenced. Since it's hard to control lifespan of an object in asynchronous environment, we should make all AsyncXxx objects a referenced objecst, that will self-destruct when its reference count drops to zero, indicating that the last client that used the object has abandoned it. Change-Id: I6f8194aa14e52a23a8772d827583782989654504 --- android/async-socket-connector.c | 112 +++++++++++++++-------- android/async-socket-connector.h | 27 +++++- android/async-socket.c | 190 +++++++++++++++++++++++++++++++-------- android/async-socket.h | 47 +++++++++- android/async-utils.c | 11 +++ android/async-utils.h | 8 ++ 6 files changed, 316 insertions(+), 79 deletions(-) diff --git a/android/async-socket-connector.c b/android/async-socket-connector.c index 41a13e0..80ec5bb 100644 --- a/android/async-socket-connector.c +++ b/android/async-socket-connector.c @@ -55,6 +55,8 @@ struct AsyncSocketConnector { int retry_to; /* Socket descriptor for the connection. */ int fd; + /* Number of outstanding references to the connector. */ + int ref_count; }; /* Asynchronous I/O looper callback invoked by the connector. @@ -79,11 +81,20 @@ static void _async_socket_connector_free(AsyncSocketConnector* connector) { if (connector != NULL) { + D("%s: Connector is destroyed.", _asc_socket_string(connector)); + if (asyncConnector_stop(connector->connector) == 0) { + /* Connection was in progress. We need to destroy I/O descriptor for + * that connection. */ + D("%s: Stopped async connection in progress.", + _asc_socket_string(connector)); + loopIo_done(connector->connector_io); + } if (connector->fd >= 0) { socket_close(connector->fd); } if (connector->looper != NULL) { + loopTimer_stop(connector->connector_timer); loopTimer_done(connector->connector_timer); looper_free(connector->looper); } @@ -106,15 +117,13 @@ _async_socket_connector_open_socket(AsyncSocketConnector* connector) /* Open socket. */ connector->fd = socket_create_inet(SOCKET_STREAM); if (connector->fd < 0) { - D("Unable to create AsyncSocketConnector socket for '%s'. Error: %d -> %s", + D("%s: Unable to create socket: %d -> %s", _asc_socket_string(connector), errno, strerror(errno)); return -1; } /* Prepare for async I/O on the connector. */ socket_set_nonblock(connector->fd); - loopIo_init(connector->connector_io, connector->looper, connector->fd, - _on_async_socket_connector_io, connector); return 0; } @@ -136,12 +145,11 @@ _async_socket_connector_close_socket(AsyncSocketConnector* connector) /* Asynchronous connector (AsyncConnector instance) has completed connection * attempt. - * - * NOTE: Upon exit from this routine AsyncSocketConnector instance might be - * destroyed. So, once this routine is called, there must be no further - * references to AsyncSocketConnector instance passed to this routine. * Param: - * connector - Initialized AsyncSocketConnector instance. + * connector - Initialized AsyncSocketConnector instance. Note: When this + * callback is called, the caller has referenced passed connector object, + * So, it's guaranteed that this connector is not going to be destroyed + * while this routine executes. * status - Status of the connection attempt. */ static void @@ -173,14 +181,11 @@ _on_async_socket_connector_connecting(AsyncSocketConnector* connector, } if (action == ASIO_ACTION_RETRY) { - D("Retrying connection to socket '%s'", _asc_socket_string(connector)); + D("%s: Retrying connection", _asc_socket_string(connector)); loopTimer_startRelative(connector->connector_timer, connector->retry_to); - } else { - if (action == ASIO_ACTION_ABORT) { - D("%s: AsyncSocketConnector client for socket '%s' has aborted connection", - __FUNCTION__, _asc_socket_string(connector)); - } - _async_socket_connector_free(connector); + } else if (action == ASIO_ACTION_ABORT) { + D("%s: AsyncSocketConnector client for socket '%s' has aborted connection", + __FUNCTION__, _asc_socket_string(connector)); } } @@ -189,6 +194,9 @@ _on_async_socket_connector_io(void* opaque, int fd, unsigned events) { AsyncSocketConnector* const connector = (AsyncSocketConnector*)opaque; + /* Reference the connector while we're handing I/O. */ + async_socket_connector_reference(connector); + /* Notify the client that another connection attempt is about to start. */ const AsyncIOAction action = connector->on_connected_cb(connector->on_connected_cb_opaque, @@ -200,8 +208,10 @@ _on_async_socket_connector_io(void* opaque, int fd, unsigned events) } else { D("%s: AsyncSocketConnector client for socket '%s' has aborted connection", __FUNCTION__, _asc_socket_string(connector)); - _async_socket_connector_free(connector); } + + /* Release the connector after we're done with handing I/O. */ + async_socket_connector_release(connector); } /* Retry connection timer callback. @@ -214,30 +224,37 @@ _on_async_socket_connector_retry(void* opaque) AsyncStatus status; AsyncSocketConnector* const connector = (AsyncSocketConnector*)opaque; + /* Reference the connector while we're in callback. */ + async_socket_connector_reference(connector); + /* Invoke the callback to notify about a connection retry attempt. */ AsyncIOAction action = connector->on_connected_cb(connector->on_connected_cb_opaque, connector, ASIO_STATE_RETRYING); - if (action == ASIO_ACTION_ABORT) { - D("%s: AsyncSocketConnector client for socket '%s' has aborted connection", - __FUNCTION__, _asc_socket_string(connector)); - _async_socket_connector_free(connector); - return; - } + if (action != ASIO_ACTION_ABORT) { + /* Close handle opened for the previous (failed) attempt. */ + _async_socket_connector_close_socket(connector); - /* Close handle opened for the previous (failed) attempt. */ - _async_socket_connector_close_socket(connector); + /* Retry connection attempt. */ + if (_async_socket_connector_open_socket(connector) == 0) { + loopIo_init(connector->connector_io, connector->looper, + connector->fd, _on_async_socket_connector_io, connector); + status = asyncConnector_init(connector->connector, + &connector->address, + connector->connector_io); + } else { + status = ASYNC_ERROR; + } - /* Retry connection attempt. */ - if (_async_socket_connector_open_socket(connector) == 0) { - status = asyncConnector_init(connector->connector, &connector->address, - connector->connector_io); + _on_async_socket_connector_connecting(connector, status); } else { - status = ASYNC_ERROR; + D("%s: AsyncSocketConnector client for socket '%s' has aborted connection", + __FUNCTION__, _asc_socket_string(connector)); } - _on_async_socket_connector_connecting(connector, status); + /* Release the connector after we're done with the callback. */ + async_socket_connector_release(connector); } /******************************************************************************** @@ -265,6 +282,7 @@ async_socket_connector_new(const SockAddress* address, connector->retry_to = retry_to; connector->on_connected_cb = cb; connector->on_connected_cb_opaque = cb_opaque; + connector->ref_count = 1; /* Copy socket address. */ if (sock_address_get_family(address) == SOCKET_UNIX) { @@ -275,11 +293,7 @@ async_socket_connector_new(const SockAddress* address, /* Create a looper for asynchronous I/O. */ connector->looper = looper_newCore(); - if (connector->looper != NULL) { - /* Create a timer that will be used for connection retries. */ - loopTimer_init(connector->connector_timer, connector->looper, - _on_async_socket_connector_retry, connector); - } else { + if (connector->looper == NULL) { E("Unable to create I/O looper for AsyncSocketConnector for socket '%s'", _asc_socket_string(connector)); cb(cb_opaque, connector, ASIO_STATE_FAILED); @@ -287,9 +301,34 @@ async_socket_connector_new(const SockAddress* address, return NULL; } + /* Create a timer that will be used for connection retries. */ + loopTimer_init(connector->connector_timer, connector->looper, + _on_async_socket_connector_retry, connector); + return connector; } +int +async_socket_connector_reference(AsyncSocketConnector* connector) +{ + assert(connector->ref_count > 0); + connector->ref_count++; + return connector->ref_count; +} + +int +async_socket_connector_release(AsyncSocketConnector* connector) +{ + assert(connector->ref_count > 0); + connector->ref_count--; + if (connector->ref_count == 0) { + /* Last reference has been dropped. Destroy this object. */ + _async_socket_connector_free(connector); + return 0; + } + return connector->ref_count; +} + void async_socket_connector_connect(AsyncSocketConnector* connector) { @@ -302,9 +341,10 @@ async_socket_connector_connect(AsyncSocketConnector* connector) if (action == ASIO_ACTION_ABORT) { D("%s: AsyncSocketConnector client for socket '%s' has aborted connection", __FUNCTION__, _asc_socket_string(connector)); - _async_socket_connector_free(connector); return; } else { + loopIo_init(connector->connector_io, connector->looper, + connector->fd, _on_async_socket_connector_io, connector); status = asyncConnector_init(connector->connector, &connector->address, connector->connector_io); diff --git a/android/async-socket-connector.h b/android/async-socket-connector.h index 4c05059..0769f56 100644 --- a/android/async-socket-connector.h +++ b/android/async-socket-connector.h @@ -58,9 +58,10 @@ * to cancel further connection attempts by returning ASIO_ACTION_ABORT, or it * can allow another connection attempt by returning ASIO_ACTION_RETRY. * - * The client has no control over the lifespan of initialized connector instance. - * It always self-destructs after client's cllback returns with a status other - * than ASIO_ACTION_RETRY. + * Since it's hard to control lifespan of an object in asynchronous environment, + * we make AsyncSocketConnector a referenced object, that will self-destruct when + * its reference count drops to zero, indicating that the last client has + * abandoned that object. */ /* Declares async socket connector descriptor. */ @@ -81,6 +82,8 @@ typedef AsyncIOAction (*asc_event_cb)(void* opaque, AsyncIOState event); /* Creates and initializes AsyncSocketConnector instance. + * Note that upon exit from this routine the reference count to the returned + * object is set to 1. * Param: * address - Initialized socket address to connect to. * retry_to - Timeout to retry a failed connection attempt in milliseconds. @@ -98,6 +101,24 @@ extern AsyncSocketConnector* async_socket_connector_new(const SockAddress* addre asc_event_cb cb, void* cb_opaque); +/* References AsyncSocketConnector object. + * Param: + * connector - Initialized AsyncSocketConnector instance. + * Return: + * Number of outstanding references to the object. + */ +extern int async_socket_connector_reference(AsyncSocketConnector* connector); + +/* Releases AsyncSocketConnector object. + * Note that upon exit from this routine the object might be destroyed, even if + * the routine returns value other than zero. + * Param: + * connector - Initialized AsyncSocketConnector instance. + * Return: + * Number of outstanding references to the object. + */ +extern int async_socket_connector_release(AsyncSocketConnector* connector); + /* Initiates asynchronous connection. * Note that connection result will be reported via callback set with the call to * async_socket_connector_new routine. diff --git a/android/async-socket.c b/android/async-socket.c index 4be69f6..a2577c9 100644 --- a/android/async-socket.c +++ b/android/async-socket.c @@ -76,6 +76,10 @@ struct AsyncSocketIO { int is_io_read; /* State of the I/O. */ AsyncIOState state; + /* Number of outstanding references to the I/O. */ + int ref_count; + /* Deadline for this I/O */ + Duration deadline; }; /* @@ -140,18 +144,24 @@ _async_socket_rw_new(AsyncSocket* as, asio->on_io = io_cb; asio->io_opaque = io_opaque; asio->state = ASIO_STATE_QUEUED; - + asio->ref_count = 1; + asio->deadline = deadline; loopTimer_init(asio->timer, _async_socket_get_looper(as), _on_async_socket_io_timed_out, asio); loopTimer_startAbsolute(asio->timer, deadline); + /* Reference socket that is holding this I/O. */ + async_socket_reference(as); + return asio; } /* Destroys and frees I/O descriptor. */ static void -_async_socket_io_destroy(AsyncSocketIO* asio) +_async_socket_io_free(AsyncSocketIO* asio) { + AsyncSocket* const as = asio->as; + loopTimer_stop(asio->timer); loopTimer_done(asio->timer); @@ -163,6 +173,30 @@ _async_socket_io_destroy(AsyncSocketIO* asio) } else { AFREE(asio); } + + /* Release socket that is holding this I/O. */ + async_socket_release(as); +} + +int +async_socket_io_reference(AsyncSocketIO* asio) +{ + assert(asio->ref_count > 0); + asio->ref_count++; + return asio->ref_count; +} + +int +async_socket_io_release(AsyncSocketIO* asio) +{ + assert(asio->ref_count > 0); + asio->ref_count--; + if (asio->ref_count == 0) { + /* Last reference has been dropped. Destroy this object. */ + _async_socket_io_free(asio); + return 0; + } + return asio->ref_count; } /* Creates new asynchronous socket reader. @@ -217,10 +251,16 @@ static void _on_async_socket_io_timed_out(void* opaque) { AsyncSocketIO* const asio = (AsyncSocketIO*)opaque; - const AsyncIOAction action = _async_socket_io_timed_out(asio->as, asio); - if (action != ASIO_ACTION_RETRY) { - _async_socket_io_destroy(asio); - } + AsyncSocket* const as = asio->as; + + D("%s: %s I/O (deadline = %lld) timed out at %lld", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", + asio->deadline, async_socket_deadline(as, 0)); + + /* Reference while in callback. */ + async_socket_io_reference(asio); + _async_socket_io_timed_out(asio->as, asio); + async_socket_io_release(asio); } /******************************************************************************** @@ -230,6 +270,7 @@ _on_async_socket_io_timed_out(void* opaque) AsyncSocket* async_socket_io_get_socket(const AsyncSocketIO* asio) { + async_socket_reference(asio->as); return asio->as; } @@ -318,6 +359,8 @@ struct AsyncSocket { int fd; /* Timeout to use for reconnection attempts. */ int reconnect_to; + /* Number of outstanding references to the socket. */ + int ref_count; }; static const char* @@ -337,6 +380,8 @@ _async_socket_get_looper(AsyncSocket* as) * as - Initialized AsyncSocket instance. * Return: * First I/O pulled out of the list, or NULL if there are no I/O in the list. + * Note that the caller is responsible for releasing the I/O object returned + * from this routine. */ static AsyncSocketIO* _async_socket_pull_first_io(AsyncSocket* as, @@ -360,6 +405,8 @@ _async_socket_pull_first_io(AsyncSocket* as, * Return: * First reader pulled out of the list, or NULL if there are no readers in the * list. + * Note that the caller is responsible for releasing the I/O object returned + * from this routine. */ static AsyncSocketIO* _async_socket_pull_first_reader(AsyncSocket* as) @@ -373,6 +420,8 @@ _async_socket_pull_first_reader(AsyncSocket* as) * Return: * First writer pulled out of the list, or NULL if there are no writers in the * list. + * Note that the caller is responsible for releasing the I/O object returned + * from this routine. */ static AsyncSocketIO* _async_socket_pull_first_writer(AsyncSocket* as) @@ -414,6 +463,9 @@ _async_socket_remove_io(AsyncSocket* as, *list_tail = prev; } + /* Release I/O adjusting reference added when I/O has been saved in the list. */ + async_socket_io_release(io); + return 1; } @@ -421,10 +473,8 @@ _async_socket_remove_io(AsyncSocket* as, * Param: * as - Initialized AsyncSocket instance. * list_head, list_tail - Pointers to the list head and tail. - * Return: - * Next I/O at the head of the list, or NULL if I/O list became empty. */ -static AsyncSocketIO* +static void _async_socket_advance_io(AsyncSocket* as, AsyncSocketIO** list_head, AsyncSocketIO** list_tail) @@ -437,31 +487,30 @@ _async_socket_advance_io(AsyncSocket* as, if (*list_head == NULL) { *list_tail = NULL; } - return *list_head; + if (first_io != NULL) { + /* Release I/O removed from the head of the list. */ + async_socket_io_release(first_io); + } } /* Advances to the next reader in the list. * Param: * as - Initialized AsyncSocket instance. - * Return: - * Next reader at the head of the list, or NULL if reader list became empty. */ -static AsyncSocketIO* +static void _async_socket_advance_reader(AsyncSocket* as) { - return _async_socket_advance_io(as, &as->readers_head, &as->readers_tail); + _async_socket_advance_io(as, &as->readers_head, &as->readers_tail); } /* Advances to the next writer in the list. * Param: * as - Initialized AsyncSocket instance. - * Return: - * Next writer at the head of the list, or NULL if writer list became empty. */ -static AsyncSocketIO* +static void _async_socket_advance_writer(AsyncSocket* as) { - return _async_socket_advance_io(as, &as->writers_head, &as->writers_tail); + _async_socket_advance_io(as, &as->writers_head, &as->writers_tail); } /* Completes an I/O. @@ -552,7 +601,7 @@ _async_socket_cancel_readers(AsyncSocket* as) /* We ignore action returned from the cancellation callback, since we're * in a disconnected state here. */ _async_socket_cancel_io(as, to_cancel); - _async_socket_io_destroy(to_cancel); + async_socket_io_release(to_cancel); } } @@ -568,7 +617,7 @@ _async_socket_cancel_writers(AsyncSocket* as) /* We ignore action returned from the cancellation callback, since we're * in a disconnected state here. */ _async_socket_cancel_io(as, to_cancel); - _async_socket_io_destroy(to_cancel); + async_socket_io_release(to_cancel); } } @@ -607,11 +656,10 @@ _async_socket_close_socket(AsyncSocket* as) * as - Initialized AsyncSocket instance. */ static void -_async_socket_destroy(AsyncSocket* as) +_async_socket_free(AsyncSocket* as) { if (as != NULL) { - /* Cancel all the I/O */ - _async_socket_cancel_all_io(as); + D("AsyncSocket '%s' is destroyed", _async_socket_string(as)); /* Close socket. */ _async_socket_close_socket(as); @@ -684,8 +732,9 @@ _on_async_socket_disconnected(AsyncSocket* as) static AsyncIOAction _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio) { - D("Async socket '%s' I/O failure: %d -> %s", - _async_socket_string(as), errno, strerror(errno)); + D("Async socket '%s' %s I/O failure: %d -> %s", + _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", + errno, strerror(errno)); /* Report the failure. */ return _async_socket_io_failure(as, asio, errno); @@ -712,6 +761,9 @@ _on_async_socket_recv(AsyncSocket* as) return 0; } + /* Reference the reader while we're working with it in this callback. */ + async_socket_io_reference(asr); + /* Bump I/O state, and inform the client that I/O is in progress. */ if (asr->state == ASIO_STATE_QUEUED) { asr->state = ASIO_STATE_STARTED; @@ -724,7 +776,6 @@ _on_async_socket_recv(AsyncSocket* as) _async_socket_string(as)); /* Move on to the next reader. */ _async_socket_advance_reader(as); - _async_socket_io_destroy(asr); /* Lets see if there are still active readers, and enable, or disable * read I/O callback accordingly. */ if (as->readers_head != NULL) { @@ -732,6 +783,7 @@ _on_async_socket_recv(AsyncSocket* as) } else { loopIo_dontWantRead(as->io); } + async_socket_io_release(asr); return 0; } @@ -747,6 +799,7 @@ _on_async_socket_recv(AsyncSocket* as) /* Socket has been disconnected. */ errno = ECONNRESET; _on_async_socket_disconnected(as); + async_socket_io_release(asr); return -1; } @@ -754,6 +807,7 @@ _on_async_socket_recv(AsyncSocket* as) if (errno == EWOULDBLOCK || errno == EAGAIN) { /* Yield to writes behind this read. */ loopIo_wantRead(as->io); + async_socket_io_release(asr); return 0; } @@ -764,7 +818,6 @@ _on_async_socket_recv(AsyncSocket* as) _async_socket_string(as)); /* Move on to the next reader. */ _async_socket_advance_reader(as); - _async_socket_io_destroy(asr); /* Lets see if there are still active readers, and enable, or disable * read I/O callback accordingly. */ if (as->readers_head != NULL) { @@ -773,6 +826,7 @@ _on_async_socket_recv(AsyncSocket* as) loopIo_dontWantRead(as->io); } } + async_socket_io_release(asr); return -1; } @@ -784,7 +838,6 @@ _on_async_socket_recv(AsyncSocket* as) /* Notify reader completion. */ _async_socket_complete_io(as, asr); - _async_socket_io_destroy(asr); } /* Lets see if there are still active readers, and enable, or disable read @@ -795,6 +848,8 @@ _on_async_socket_recv(AsyncSocket* as) loopIo_dontWantRead(as->io); } + async_socket_io_release(asr); + return 0; } @@ -819,6 +874,9 @@ _on_async_socket_send(AsyncSocket* as) return 0; } + /* Reference the writer while we're working with it in this callback. */ + async_socket_io_reference(asw); + /* Bump I/O state, and inform the client that I/O is in progress. */ if (asw->state == ASIO_STATE_QUEUED) { asw->state = ASIO_STATE_STARTED; @@ -829,9 +887,8 @@ _on_async_socket_send(AsyncSocket* as) if (action == ASIO_ACTION_ABORT) { D("Write is aborted by the client of async socket '%s'", _async_socket_string(as)); - /* Move on to the next reader. */ - _async_socket_advance_reader(as); - _async_socket_io_destroy(asw); + /* Move on to the next writer. */ + _async_socket_advance_writer(as); /* Lets see if there are still active writers, and enable, or disable * write I/O callback accordingly. */ if (as->writers_head != NULL) { @@ -839,6 +896,7 @@ _on_async_socket_send(AsyncSocket* as) } else { loopIo_dontWantWrite(as->io); } + async_socket_io_release(asw); return 0; } @@ -854,6 +912,7 @@ _on_async_socket_send(AsyncSocket* as) /* Socket has been disconnected. */ errno = ECONNRESET; _on_async_socket_disconnected(as); + async_socket_io_release(asw); return -1; } @@ -861,18 +920,17 @@ _on_async_socket_send(AsyncSocket* as) if (errno == EWOULDBLOCK || errno == EAGAIN) { /* Yield to reads behind this write. */ loopIo_wantWrite(as->io); + async_socket_io_release(asw); return 0; } /* An I/O error. */ - /* An I/O error. */ action = _on_async_socket_failure(as, asw); if (action == ASIO_ACTION_ABORT) { D("Write is aborted on failure by the client of async socket '%s'", _async_socket_string(as)); - /* Move on to the next reader. */ - _async_socket_advance_reader(as); - _async_socket_io_destroy(asw); + /* Move on to the next writer. */ + _async_socket_advance_writer(as); /* Lets see if there are still active writers, and enable, or disable * write I/O callback accordingly. */ if (as->writers_head != NULL) { @@ -881,10 +939,11 @@ _on_async_socket_send(AsyncSocket* as) loopIo_dontWantWrite(as->io); } } + async_socket_io_release(asw); return -1; } - /* Update the reader descriptor. */ + /* Update the writer descriptor. */ asw->transferred += res; if (asw->transferred == asw->to_transfer) { /* This write is completed. Move on to the next writer. */ @@ -892,7 +951,6 @@ _on_async_socket_send(AsyncSocket* as) /* Notify writer completion. */ _async_socket_complete_io(as, asw); - _async_socket_io_destroy(asw); } /* Lets see if there are still active writers, and enable, or disable write @@ -903,6 +961,8 @@ _on_async_socket_send(AsyncSocket* as) loopIo_dontWantWrite(as->io); } + async_socket_io_release(asw); + return 0; } @@ -917,17 +977,24 @@ _on_async_socket_io(void* opaque, int fd, unsigned events) { AsyncSocket* const as = (AsyncSocket*)opaque; + /* Reference the socket while we're working with it in this callback. */ + async_socket_reference(as); + if ((events & LOOP_IO_READ) != 0) { if (_on_async_socket_recv(as) != 0) { + async_socket_release(as); return; } } if ((events & LOOP_IO_WRITE) != 0) { if (_on_async_socket_send(as) != 0) { + async_socket_release(as); return; } } + + async_socket_release(as); } /* A callback that is invoked by asynchronous socket connector on connection @@ -947,6 +1014,9 @@ _on_connector_events(void* opaque, AsyncIOAction action; AsyncSocket* const as = (AsyncSocket*)opaque; + /* Reference the socket while we're working with it in this callback. */ + async_socket_reference(as); + if (event == ASIO_STATE_SUCCEEDED) { /* Accept the connection. */ as->fd = async_socket_connector_pull_fd(connector); @@ -963,6 +1033,12 @@ _on_connector_events(void* opaque, _async_socket_close_socket(as); } + if (action != ASIO_ACTION_RETRY) { + async_socket_connector_release(connector); + } + + async_socket_release(as); + return action; } @@ -974,7 +1050,11 @@ void _on_async_socket_reconnect(void* opaque) { AsyncSocket* as = (AsyncSocket*)opaque; + + /* Reference the socket while we're working with it in this callback. */ + async_socket_reference(as); async_socket_connect(as, as->reconnect_to); + async_socket_release(as); } @@ -1002,13 +1082,14 @@ async_socket_new(int port, as->on_connection = client_cb; as->readers_head = as->readers_tail = NULL; as->reconnect_to = reconnect_to; + as->ref_count = 1; sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port); as->looper = looper_newCore(); if (as->looper == NULL) { E("Unable to create I/O looper for async socket '%s'", _async_socket_string(as)); client_cb(client_opaque, as, ASIO_STATE_FAILED); - _async_socket_destroy(as); + _async_socket_free(as); return NULL; } loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as); @@ -1016,6 +1097,28 @@ async_socket_new(int port, return as; } +int +async_socket_reference(AsyncSocket* as) +{ + assert(as->ref_count > 0); + as->ref_count++; + return as->ref_count; +} + +int +async_socket_release(AsyncSocket* as) +{ + assert(as->ref_count > 0); + as->ref_count--; + if (as->ref_count == 0) { + /* Last reference has been dropped. Destroy this object. */ + _async_socket_cancel_all_io(as); + _async_socket_free(as); + return 0; + } + return as->ref_count; +} + void async_socket_connect(AsyncSocket* as, int retry_to) { @@ -1034,7 +1137,6 @@ async_socket_disconnect(AsyncSocket* as) if (as != NULL) { _async_socket_cancel_all_io(as); _async_socket_close_socket(as); - _async_socket_destroy(as); } } @@ -1056,6 +1158,8 @@ async_socket_read_abs(AsyncSocket* as, AsyncSocketIO* const asr = _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque, deadline); + /* Add new reader to the list. Note that we use initial reference from I/O + * 'new' routine as "in the list" reference counter. */ if (as->readers_head == NULL) { as->readers_head = as->readers_tail = asr; } else { @@ -1087,6 +1191,8 @@ async_socket_write_abs(AsyncSocket* as, AsyncSocketIO* const asw = _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque, deadline); + /* Add new writer to the list. Note that we use initial reference from I/O + * 'new' routine as "in the list" reference counter. */ if (as->writers_head == NULL) { as->writers_head = as->writers_tail = asw; } else { @@ -1120,3 +1226,9 @@ async_socket_deadline(AsyncSocket* as, int rel) return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel : DURATION_INFINITE; } + +int +async_socket_get_port(const AsyncSocket* as) +{ + return sock_address_get_port(&as->address); +} diff --git a/android/async-socket.h b/android/async-socket.h index 1402126..661fae5 100644 --- a/android/async-socket.h +++ b/android/async-socket.h @@ -29,6 +29,11 @@ * Since all the operations (including connection) are asynchronous, all the * operation results are reported back to the client of this API via set of * callbacks that client supplied to this API. + * + * Since it's hard to control lifespan of an object in asynchronous environment, + * we make AsyncSocketConnector a referenced object, that will self-destruct when + * its reference count drops to zero, indicating that the last client has + * abandoned that object. */ /* Declares asynchronous socket descriptor. */ @@ -65,7 +70,26 @@ typedef AsyncIOAction (*on_as_io_cb)(void* io_opaque, * AsyncSocketIO API *******************************************************************************/ -/* Gets AsyncSocket instance for an I/O */ +/* References AsyncSocketIO object. + * Param: + * asio - Initialized AsyncSocketIO instance. + * Return: + * Number of outstanding references to the object. + */ +extern int async_socket_io_reference(AsyncSocketIO* asio); + +/* Releases AsyncSocketIO object. + * Note that upon exit from this routine the object might be destroyed, even if + * the routine returns value other than zero. + * Param: + * asio - Initialized AsyncSocketIO instance. + * Return: + * Number of outstanding references to the object. + */ +extern int async_socket_io_release(AsyncSocketIO* asio); + +/* Gets AsyncSocket instance for an I/O. Note that this routine will reference + * AsyncSocket instance before returning it to the caller. */ extern AsyncSocket* async_socket_io_get_socket(const AsyncSocketIO* asio); /* Cancels time out set for an I/O */ @@ -121,6 +145,24 @@ extern AsyncSocket* async_socket_new(int port, on_as_connection_cb connect_cb, void* client_opaque); +/* References AsyncSocket object. + * Param: + * as - Initialized AsyncSocket instance. + * Return: + * Number of outstanding references to the object. + */ +extern int async_socket_reference(AsyncSocket* as); + +/* Releases AsyncSocket object. + * Note that upon exit from this routine the object might be destroyed, even if + * the routine returns value other than zero. + * Param: + * as - Initialized AsyncSocket instance. + * Return: + * Number of outstanding references to the object. + */ +extern int async_socket_release(AsyncSocket* as); + /* Asynchronously connects to an asynchronous socket. * Note that connection result will be reported via callback passed to the * async_socket_new routine. @@ -216,4 +258,7 @@ extern Duration async_socket_deadline(AsyncSocket* as, int rel); /* Gets an opaque pointer associated with the socket's client */ extern void* async_socket_get_client_opaque(const AsyncSocket* as); +/* Gets TCP port for the socket. */ +extern int async_socket_get_port(const AsyncSocket* as); + #endif /* ANDROID_ASYNC_SOCKET_H_ */ diff --git a/android/async-utils.c b/android/async-utils.c index e410de0..9732111 100644 --- a/android/async-utils.c +++ b/android/async-utils.c @@ -268,3 +268,14 @@ asyncConnector_run(AsyncConnector* ac) return ASYNC_COMPLETE; } } + +int +asyncConnector_stop(AsyncConnector* ac) +{ + if (ac->state == CONNECT_CONNECTING) { + loopIo_dontWantWrite(ac->io); + ac->state = CONNECT_COMPLETED; + return 0; + } + return -1; +} diff --git a/android/async-utils.h b/android/async-utils.h index 6d460c2..30dfbe3 100644 --- a/android/async-utils.h +++ b/android/async-utils.h @@ -225,4 +225,12 @@ asyncConnector_init(AsyncConnector* ac, AsyncStatus asyncConnector_run(AsyncConnector* ac); +/* Stops connection in progress. + * Return: + * 0 if connection in progress has been stopped, or -1 if no connection has been + * in progress. + */ +int +asyncConnector_stop(AsyncConnector* ac); + #endif /* ANDROID_ASYNC_UTILS_H */ -- cgit v1.1