aboutsummaryrefslogtreecommitdiffstats
path: root/android
diff options
context:
space:
mode:
authorVladimir Chtchetkine <vchtchetkine@google.com>2012-04-03 10:27:12 -0700
committerVladimir Chtchetkine <vchtchetkine@google.com>2012-04-03 16:13:42 -0700
commitef4ccd385650612a830a098f4b1eac48482b65b3 (patch)
treedad74ea321507a3a0912a0142387e8df3bff901f /android
parent6dc5c2cef91004488f04fc6e9c0946f6d3a29705 (diff)
downloadexternal_qemu-ef4ccd385650612a830a098f4b1eac48482b65b3.zip
external_qemu-ef4ccd385650612a830a098f4b1eac48482b65b3.tar.gz
external_qemu-ef4ccd385650612a830a098f4b1eac48482b65b3.tar.bz2
Make all async I/O object referenced.
Since it's hard to control lifespan of an object in asynchronous environment, we should make all AsyncXxx objects a referenced objecst, that will self-destruct when its reference count drops to zero, indicating that the last client that used the object has abandoned it. Change-Id: I6f8194aa14e52a23a8772d827583782989654504
Diffstat (limited to 'android')
-rw-r--r--android/async-socket-connector.c112
-rw-r--r--android/async-socket-connector.h27
-rw-r--r--android/async-socket.c190
-rw-r--r--android/async-socket.h47
-rw-r--r--android/async-utils.c11
-rw-r--r--android/async-utils.h8
6 files changed, 316 insertions, 79 deletions
diff --git a/android/async-socket-connector.c b/android/async-socket-connector.c
index 41a13e0..80ec5bb 100644
--- a/android/async-socket-connector.c
+++ b/android/async-socket-connector.c
@@ -55,6 +55,8 @@ struct AsyncSocketConnector {
int retry_to;
/* Socket descriptor for the connection. */
int fd;
+ /* Number of outstanding references to the connector. */
+ int ref_count;
};
/* Asynchronous I/O looper callback invoked by the connector.
@@ -79,11 +81,20 @@ static void
_async_socket_connector_free(AsyncSocketConnector* connector)
{
if (connector != NULL) {
+ D("%s: Connector is destroyed.", _asc_socket_string(connector));
+ if (asyncConnector_stop(connector->connector) == 0) {
+ /* Connection was in progress. We need to destroy I/O descriptor for
+ * that connection. */
+ D("%s: Stopped async connection in progress.",
+ _asc_socket_string(connector));
+ loopIo_done(connector->connector_io);
+ }
if (connector->fd >= 0) {
socket_close(connector->fd);
}
if (connector->looper != NULL) {
+ loopTimer_stop(connector->connector_timer);
loopTimer_done(connector->connector_timer);
looper_free(connector->looper);
}
@@ -106,15 +117,13 @@ _async_socket_connector_open_socket(AsyncSocketConnector* connector)
/* Open socket. */
connector->fd = socket_create_inet(SOCKET_STREAM);
if (connector->fd < 0) {
- D("Unable to create AsyncSocketConnector socket for '%s'. Error: %d -> %s",
+ D("%s: Unable to create socket: %d -> %s",
_asc_socket_string(connector), errno, strerror(errno));
return -1;
}
/* Prepare for async I/O on the connector. */
socket_set_nonblock(connector->fd);
- loopIo_init(connector->connector_io, connector->looper, connector->fd,
- _on_async_socket_connector_io, connector);
return 0;
}
@@ -136,12 +145,11 @@ _async_socket_connector_close_socket(AsyncSocketConnector* connector)
/* Asynchronous connector (AsyncConnector instance) has completed connection
* attempt.
- *
- * NOTE: Upon exit from this routine AsyncSocketConnector instance might be
- * destroyed. So, once this routine is called, there must be no further
- * references to AsyncSocketConnector instance passed to this routine.
* Param:
- * connector - Initialized AsyncSocketConnector instance.
+ * connector - Initialized AsyncSocketConnector instance. Note: When this
+ * callback is called, the caller has referenced passed connector object,
+ * So, it's guaranteed that this connector is not going to be destroyed
+ * while this routine executes.
* status - Status of the connection attempt.
*/
static void
@@ -173,14 +181,11 @@ _on_async_socket_connector_connecting(AsyncSocketConnector* connector,
}
if (action == ASIO_ACTION_RETRY) {
- D("Retrying connection to socket '%s'", _asc_socket_string(connector));
+ D("%s: Retrying connection", _asc_socket_string(connector));
loopTimer_startRelative(connector->connector_timer, connector->retry_to);
- } else {
- if (action == ASIO_ACTION_ABORT) {
- D("%s: AsyncSocketConnector client for socket '%s' has aborted connection",
- __FUNCTION__, _asc_socket_string(connector));
- }
- _async_socket_connector_free(connector);
+ } else if (action == ASIO_ACTION_ABORT) {
+ D("%s: AsyncSocketConnector client for socket '%s' has aborted connection",
+ __FUNCTION__, _asc_socket_string(connector));
}
}
@@ -189,6 +194,9 @@ _on_async_socket_connector_io(void* opaque, int fd, unsigned events)
{
AsyncSocketConnector* const connector = (AsyncSocketConnector*)opaque;
+ /* Reference the connector while we're handing I/O. */
+ async_socket_connector_reference(connector);
+
/* Notify the client that another connection attempt is about to start. */
const AsyncIOAction action =
connector->on_connected_cb(connector->on_connected_cb_opaque,
@@ -200,8 +208,10 @@ _on_async_socket_connector_io(void* opaque, int fd, unsigned events)
} else {
D("%s: AsyncSocketConnector client for socket '%s' has aborted connection",
__FUNCTION__, _asc_socket_string(connector));
- _async_socket_connector_free(connector);
}
+
+ /* Release the connector after we're done with handing I/O. */
+ async_socket_connector_release(connector);
}
/* Retry connection timer callback.
@@ -214,30 +224,37 @@ _on_async_socket_connector_retry(void* opaque)
AsyncStatus status;
AsyncSocketConnector* const connector = (AsyncSocketConnector*)opaque;
+ /* Reference the connector while we're in callback. */
+ async_socket_connector_reference(connector);
+
/* Invoke the callback to notify about a connection retry attempt. */
AsyncIOAction action =
connector->on_connected_cb(connector->on_connected_cb_opaque,
connector, ASIO_STATE_RETRYING);
- if (action == ASIO_ACTION_ABORT) {
- D("%s: AsyncSocketConnector client for socket '%s' has aborted connection",
- __FUNCTION__, _asc_socket_string(connector));
- _async_socket_connector_free(connector);
- return;
- }
+ if (action != ASIO_ACTION_ABORT) {
+ /* Close handle opened for the previous (failed) attempt. */
+ _async_socket_connector_close_socket(connector);
- /* Close handle opened for the previous (failed) attempt. */
- _async_socket_connector_close_socket(connector);
+ /* Retry connection attempt. */
+ if (_async_socket_connector_open_socket(connector) == 0) {
+ loopIo_init(connector->connector_io, connector->looper,
+ connector->fd, _on_async_socket_connector_io, connector);
+ status = asyncConnector_init(connector->connector,
+ &connector->address,
+ connector->connector_io);
+ } else {
+ status = ASYNC_ERROR;
+ }
- /* Retry connection attempt. */
- if (_async_socket_connector_open_socket(connector) == 0) {
- status = asyncConnector_init(connector->connector, &connector->address,
- connector->connector_io);
+ _on_async_socket_connector_connecting(connector, status);
} else {
- status = ASYNC_ERROR;
+ D("%s: AsyncSocketConnector client for socket '%s' has aborted connection",
+ __FUNCTION__, _asc_socket_string(connector));
}
- _on_async_socket_connector_connecting(connector, status);
+ /* Release the connector after we're done with the callback. */
+ async_socket_connector_release(connector);
}
/********************************************************************************
@@ -265,6 +282,7 @@ async_socket_connector_new(const SockAddress* address,
connector->retry_to = retry_to;
connector->on_connected_cb = cb;
connector->on_connected_cb_opaque = cb_opaque;
+ connector->ref_count = 1;
/* Copy socket address. */
if (sock_address_get_family(address) == SOCKET_UNIX) {
@@ -275,11 +293,7 @@ async_socket_connector_new(const SockAddress* address,
/* Create a looper for asynchronous I/O. */
connector->looper = looper_newCore();
- if (connector->looper != NULL) {
- /* Create a timer that will be used for connection retries. */
- loopTimer_init(connector->connector_timer, connector->looper,
- _on_async_socket_connector_retry, connector);
- } else {
+ if (connector->looper == NULL) {
E("Unable to create I/O looper for AsyncSocketConnector for socket '%s'",
_asc_socket_string(connector));
cb(cb_opaque, connector, ASIO_STATE_FAILED);
@@ -287,9 +301,34 @@ async_socket_connector_new(const SockAddress* address,
return NULL;
}
+ /* Create a timer that will be used for connection retries. */
+ loopTimer_init(connector->connector_timer, connector->looper,
+ _on_async_socket_connector_retry, connector);
+
return connector;
}
+int
+async_socket_connector_reference(AsyncSocketConnector* connector)
+{
+ assert(connector->ref_count > 0);
+ connector->ref_count++;
+ return connector->ref_count;
+}
+
+int
+async_socket_connector_release(AsyncSocketConnector* connector)
+{
+ assert(connector->ref_count > 0);
+ connector->ref_count--;
+ if (connector->ref_count == 0) {
+ /* Last reference has been dropped. Destroy this object. */
+ _async_socket_connector_free(connector);
+ return 0;
+ }
+ return connector->ref_count;
+}
+
void
async_socket_connector_connect(AsyncSocketConnector* connector)
{
@@ -302,9 +341,10 @@ async_socket_connector_connect(AsyncSocketConnector* connector)
if (action == ASIO_ACTION_ABORT) {
D("%s: AsyncSocketConnector client for socket '%s' has aborted connection",
__FUNCTION__, _asc_socket_string(connector));
- _async_socket_connector_free(connector);
return;
} else {
+ loopIo_init(connector->connector_io, connector->looper,
+ connector->fd, _on_async_socket_connector_io, connector);
status = asyncConnector_init(connector->connector,
&connector->address,
connector->connector_io);
diff --git a/android/async-socket-connector.h b/android/async-socket-connector.h
index 4c05059..0769f56 100644
--- a/android/async-socket-connector.h
+++ b/android/async-socket-connector.h
@@ -58,9 +58,10 @@
* to cancel further connection attempts by returning ASIO_ACTION_ABORT, or it
* can allow another connection attempt by returning ASIO_ACTION_RETRY.
*
- * The client has no control over the lifespan of initialized connector instance.
- * It always self-destructs after client's cllback returns with a status other
- * than ASIO_ACTION_RETRY.
+ * Since it's hard to control lifespan of an object in asynchronous environment,
+ * we make AsyncSocketConnector a referenced object, that will self-destruct when
+ * its reference count drops to zero, indicating that the last client has
+ * abandoned that object.
*/
/* Declares async socket connector descriptor. */
@@ -81,6 +82,8 @@ typedef AsyncIOAction (*asc_event_cb)(void* opaque,
AsyncIOState event);
/* Creates and initializes AsyncSocketConnector instance.
+ * Note that upon exit from this routine the reference count to the returned
+ * object is set to 1.
* Param:
* address - Initialized socket address to connect to.
* retry_to - Timeout to retry a failed connection attempt in milliseconds.
@@ -98,6 +101,24 @@ extern AsyncSocketConnector* async_socket_connector_new(const SockAddress* addre
asc_event_cb cb,
void* cb_opaque);
+/* References AsyncSocketConnector object.
+ * Param:
+ * connector - Initialized AsyncSocketConnector instance.
+ * Return:
+ * Number of outstanding references to the object.
+ */
+extern int async_socket_connector_reference(AsyncSocketConnector* connector);
+
+/* Releases AsyncSocketConnector object.
+ * Note that upon exit from this routine the object might be destroyed, even if
+ * the routine returns value other than zero.
+ * Param:
+ * connector - Initialized AsyncSocketConnector instance.
+ * Return:
+ * Number of outstanding references to the object.
+ */
+extern int async_socket_connector_release(AsyncSocketConnector* connector);
+
/* Initiates asynchronous connection.
* Note that connection result will be reported via callback set with the call to
* async_socket_connector_new routine.
diff --git a/android/async-socket.c b/android/async-socket.c
index 4be69f6..a2577c9 100644
--- a/android/async-socket.c
+++ b/android/async-socket.c
@@ -76,6 +76,10 @@ struct AsyncSocketIO {
int is_io_read;
/* State of the I/O. */
AsyncIOState state;
+ /* Number of outstanding references to the I/O. */
+ int ref_count;
+ /* Deadline for this I/O */
+ Duration deadline;
};
/*
@@ -140,18 +144,24 @@ _async_socket_rw_new(AsyncSocket* as,
asio->on_io = io_cb;
asio->io_opaque = io_opaque;
asio->state = ASIO_STATE_QUEUED;
-
+ asio->ref_count = 1;
+ asio->deadline = deadline;
loopTimer_init(asio->timer, _async_socket_get_looper(as),
_on_async_socket_io_timed_out, asio);
loopTimer_startAbsolute(asio->timer, deadline);
+ /* Reference socket that is holding this I/O. */
+ async_socket_reference(as);
+
return asio;
}
/* Destroys and frees I/O descriptor. */
static void
-_async_socket_io_destroy(AsyncSocketIO* asio)
+_async_socket_io_free(AsyncSocketIO* asio)
{
+ AsyncSocket* const as = asio->as;
+
loopTimer_stop(asio->timer);
loopTimer_done(asio->timer);
@@ -163,6 +173,30 @@ _async_socket_io_destroy(AsyncSocketIO* asio)
} else {
AFREE(asio);
}
+
+ /* Release socket that is holding this I/O. */
+ async_socket_release(as);
+}
+
+int
+async_socket_io_reference(AsyncSocketIO* asio)
+{
+ assert(asio->ref_count > 0);
+ asio->ref_count++;
+ return asio->ref_count;
+}
+
+int
+async_socket_io_release(AsyncSocketIO* asio)
+{
+ assert(asio->ref_count > 0);
+ asio->ref_count--;
+ if (asio->ref_count == 0) {
+ /* Last reference has been dropped. Destroy this object. */
+ _async_socket_io_free(asio);
+ return 0;
+ }
+ return asio->ref_count;
}
/* Creates new asynchronous socket reader.
@@ -217,10 +251,16 @@ static void
_on_async_socket_io_timed_out(void* opaque)
{
AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
- const AsyncIOAction action = _async_socket_io_timed_out(asio->as, asio);
- if (action != ASIO_ACTION_RETRY) {
- _async_socket_io_destroy(asio);
- }
+ AsyncSocket* const as = asio->as;
+
+ D("%s: %s I/O (deadline = %lld) timed out at %lld",
+ _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
+ asio->deadline, async_socket_deadline(as, 0));
+
+ /* Reference while in callback. */
+ async_socket_io_reference(asio);
+ _async_socket_io_timed_out(asio->as, asio);
+ async_socket_io_release(asio);
}
/********************************************************************************
@@ -230,6 +270,7 @@ _on_async_socket_io_timed_out(void* opaque)
AsyncSocket*
async_socket_io_get_socket(const AsyncSocketIO* asio)
{
+ async_socket_reference(asio->as);
return asio->as;
}
@@ -318,6 +359,8 @@ struct AsyncSocket {
int fd;
/* Timeout to use for reconnection attempts. */
int reconnect_to;
+ /* Number of outstanding references to the socket. */
+ int ref_count;
};
static const char*
@@ -337,6 +380,8 @@ _async_socket_get_looper(AsyncSocket* as)
* as - Initialized AsyncSocket instance.
* Return:
* First I/O pulled out of the list, or NULL if there are no I/O in the list.
+ * Note that the caller is responsible for releasing the I/O object returned
+ * from this routine.
*/
static AsyncSocketIO*
_async_socket_pull_first_io(AsyncSocket* as,
@@ -360,6 +405,8 @@ _async_socket_pull_first_io(AsyncSocket* as,
* Return:
* First reader pulled out of the list, or NULL if there are no readers in the
* list.
+ * Note that the caller is responsible for releasing the I/O object returned
+ * from this routine.
*/
static AsyncSocketIO*
_async_socket_pull_first_reader(AsyncSocket* as)
@@ -373,6 +420,8 @@ _async_socket_pull_first_reader(AsyncSocket* as)
* Return:
* First writer pulled out of the list, or NULL if there are no writers in the
* list.
+ * Note that the caller is responsible for releasing the I/O object returned
+ * from this routine.
*/
static AsyncSocketIO*
_async_socket_pull_first_writer(AsyncSocket* as)
@@ -414,6 +463,9 @@ _async_socket_remove_io(AsyncSocket* as,
*list_tail = prev;
}
+ /* Release I/O adjusting reference added when I/O has been saved in the list. */
+ async_socket_io_release(io);
+
return 1;
}
@@ -421,10 +473,8 @@ _async_socket_remove_io(AsyncSocket* as,
* Param:
* as - Initialized AsyncSocket instance.
* list_head, list_tail - Pointers to the list head and tail.
- * Return:
- * Next I/O at the head of the list, or NULL if I/O list became empty.
*/
-static AsyncSocketIO*
+static void
_async_socket_advance_io(AsyncSocket* as,
AsyncSocketIO** list_head,
AsyncSocketIO** list_tail)
@@ -437,31 +487,30 @@ _async_socket_advance_io(AsyncSocket* as,
if (*list_head == NULL) {
*list_tail = NULL;
}
- return *list_head;
+ if (first_io != NULL) {
+ /* Release I/O removed from the head of the list. */
+ async_socket_io_release(first_io);
+ }
}
/* Advances to the next reader in the list.
* Param:
* as - Initialized AsyncSocket instance.
- * Return:
- * Next reader at the head of the list, or NULL if reader list became empty.
*/
-static AsyncSocketIO*
+static void
_async_socket_advance_reader(AsyncSocket* as)
{
- return _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
+ _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
}
/* Advances to the next writer in the list.
* Param:
* as - Initialized AsyncSocket instance.
- * Return:
- * Next writer at the head of the list, or NULL if writer list became empty.
*/
-static AsyncSocketIO*
+static void
_async_socket_advance_writer(AsyncSocket* as)
{
- return _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
+ _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
}
/* Completes an I/O.
@@ -552,7 +601,7 @@ _async_socket_cancel_readers(AsyncSocket* as)
/* We ignore action returned from the cancellation callback, since we're
* in a disconnected state here. */
_async_socket_cancel_io(as, to_cancel);
- _async_socket_io_destroy(to_cancel);
+ async_socket_io_release(to_cancel);
}
}
@@ -568,7 +617,7 @@ _async_socket_cancel_writers(AsyncSocket* as)
/* We ignore action returned from the cancellation callback, since we're
* in a disconnected state here. */
_async_socket_cancel_io(as, to_cancel);
- _async_socket_io_destroy(to_cancel);
+ async_socket_io_release(to_cancel);
}
}
@@ -607,11 +656,10 @@ _async_socket_close_socket(AsyncSocket* as)
* as - Initialized AsyncSocket instance.
*/
static void
-_async_socket_destroy(AsyncSocket* as)
+_async_socket_free(AsyncSocket* as)
{
if (as != NULL) {
- /* Cancel all the I/O */
- _async_socket_cancel_all_io(as);
+ D("AsyncSocket '%s' is destroyed", _async_socket_string(as));
/* Close socket. */
_async_socket_close_socket(as);
@@ -684,8 +732,9 @@ _on_async_socket_disconnected(AsyncSocket* as)
static AsyncIOAction
_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
{
- D("Async socket '%s' I/O failure: %d -> %s",
- _async_socket_string(as), errno, strerror(errno));
+ D("Async socket '%s' %s I/O failure: %d -> %s",
+ _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
+ errno, strerror(errno));
/* Report the failure. */
return _async_socket_io_failure(as, asio, errno);
@@ -712,6 +761,9 @@ _on_async_socket_recv(AsyncSocket* as)
return 0;
}
+ /* Reference the reader while we're working with it in this callback. */
+ async_socket_io_reference(asr);
+
/* Bump I/O state, and inform the client that I/O is in progress. */
if (asr->state == ASIO_STATE_QUEUED) {
asr->state = ASIO_STATE_STARTED;
@@ -724,7 +776,6 @@ _on_async_socket_recv(AsyncSocket* as)
_async_socket_string(as));
/* Move on to the next reader. */
_async_socket_advance_reader(as);
- _async_socket_io_destroy(asr);
/* Lets see if there are still active readers, and enable, or disable
* read I/O callback accordingly. */
if (as->readers_head != NULL) {
@@ -732,6 +783,7 @@ _on_async_socket_recv(AsyncSocket* as)
} else {
loopIo_dontWantRead(as->io);
}
+ async_socket_io_release(asr);
return 0;
}
@@ -747,6 +799,7 @@ _on_async_socket_recv(AsyncSocket* as)
/* Socket has been disconnected. */
errno = ECONNRESET;
_on_async_socket_disconnected(as);
+ async_socket_io_release(asr);
return -1;
}
@@ -754,6 +807,7 @@ _on_async_socket_recv(AsyncSocket* as)
if (errno == EWOULDBLOCK || errno == EAGAIN) {
/* Yield to writes behind this read. */
loopIo_wantRead(as->io);
+ async_socket_io_release(asr);
return 0;
}
@@ -764,7 +818,6 @@ _on_async_socket_recv(AsyncSocket* as)
_async_socket_string(as));
/* Move on to the next reader. */
_async_socket_advance_reader(as);
- _async_socket_io_destroy(asr);
/* Lets see if there are still active readers, and enable, or disable
* read I/O callback accordingly. */
if (as->readers_head != NULL) {
@@ -773,6 +826,7 @@ _on_async_socket_recv(AsyncSocket* as)
loopIo_dontWantRead(as->io);
}
}
+ async_socket_io_release(asr);
return -1;
}
@@ -784,7 +838,6 @@ _on_async_socket_recv(AsyncSocket* as)
/* Notify reader completion. */
_async_socket_complete_io(as, asr);
- _async_socket_io_destroy(asr);
}
/* Lets see if there are still active readers, and enable, or disable read
@@ -795,6 +848,8 @@ _on_async_socket_recv(AsyncSocket* as)
loopIo_dontWantRead(as->io);
}
+ async_socket_io_release(asr);
+
return 0;
}
@@ -819,6 +874,9 @@ _on_async_socket_send(AsyncSocket* as)
return 0;
}
+ /* Reference the writer while we're working with it in this callback. */
+ async_socket_io_reference(asw);
+
/* Bump I/O state, and inform the client that I/O is in progress. */
if (asw->state == ASIO_STATE_QUEUED) {
asw->state = ASIO_STATE_STARTED;
@@ -829,9 +887,8 @@ _on_async_socket_send(AsyncSocket* as)
if (action == ASIO_ACTION_ABORT) {
D("Write is aborted by the client of async socket '%s'",
_async_socket_string(as));
- /* Move on to the next reader. */
- _async_socket_advance_reader(as);
- _async_socket_io_destroy(asw);
+ /* Move on to the next writer. */
+ _async_socket_advance_writer(as);
/* Lets see if there are still active writers, and enable, or disable
* write I/O callback accordingly. */
if (as->writers_head != NULL) {
@@ -839,6 +896,7 @@ _on_async_socket_send(AsyncSocket* as)
} else {
loopIo_dontWantWrite(as->io);
}
+ async_socket_io_release(asw);
return 0;
}
@@ -854,6 +912,7 @@ _on_async_socket_send(AsyncSocket* as)
/* Socket has been disconnected. */
errno = ECONNRESET;
_on_async_socket_disconnected(as);
+ async_socket_io_release(asw);
return -1;
}
@@ -861,18 +920,17 @@ _on_async_socket_send(AsyncSocket* as)
if (errno == EWOULDBLOCK || errno == EAGAIN) {
/* Yield to reads behind this write. */
loopIo_wantWrite(as->io);
+ async_socket_io_release(asw);
return 0;
}
/* An I/O error. */
- /* An I/O error. */
action = _on_async_socket_failure(as, asw);
if (action == ASIO_ACTION_ABORT) {
D("Write is aborted on failure by the client of async socket '%s'",
_async_socket_string(as));
- /* Move on to the next reader. */
- _async_socket_advance_reader(as);
- _async_socket_io_destroy(asw);
+ /* Move on to the next writer. */
+ _async_socket_advance_writer(as);
/* Lets see if there are still active writers, and enable, or disable
* write I/O callback accordingly. */
if (as->writers_head != NULL) {
@@ -881,10 +939,11 @@ _on_async_socket_send(AsyncSocket* as)
loopIo_dontWantWrite(as->io);
}
}
+ async_socket_io_release(asw);
return -1;
}
- /* Update the reader descriptor. */
+ /* Update the writer descriptor. */
asw->transferred += res;
if (asw->transferred == asw->to_transfer) {
/* This write is completed. Move on to the next writer. */
@@ -892,7 +951,6 @@ _on_async_socket_send(AsyncSocket* as)
/* Notify writer completion. */
_async_socket_complete_io(as, asw);
- _async_socket_io_destroy(asw);
}
/* Lets see if there are still active writers, and enable, or disable write
@@ -903,6 +961,8 @@ _on_async_socket_send(AsyncSocket* as)
loopIo_dontWantWrite(as->io);
}
+ async_socket_io_release(asw);
+
return 0;
}
@@ -917,17 +977,24 @@ _on_async_socket_io(void* opaque, int fd, unsigned events)
{
AsyncSocket* const as = (AsyncSocket*)opaque;
+ /* Reference the socket while we're working with it in this callback. */
+ async_socket_reference(as);
+
if ((events & LOOP_IO_READ) != 0) {
if (_on_async_socket_recv(as) != 0) {
+ async_socket_release(as);
return;
}
}
if ((events & LOOP_IO_WRITE) != 0) {
if (_on_async_socket_send(as) != 0) {
+ async_socket_release(as);
return;
}
}
+
+ async_socket_release(as);
}
/* A callback that is invoked by asynchronous socket connector on connection
@@ -947,6 +1014,9 @@ _on_connector_events(void* opaque,
AsyncIOAction action;
AsyncSocket* const as = (AsyncSocket*)opaque;
+ /* Reference the socket while we're working with it in this callback. */
+ async_socket_reference(as);
+
if (event == ASIO_STATE_SUCCEEDED) {
/* Accept the connection. */
as->fd = async_socket_connector_pull_fd(connector);
@@ -963,6 +1033,12 @@ _on_connector_events(void* opaque,
_async_socket_close_socket(as);
}
+ if (action != ASIO_ACTION_RETRY) {
+ async_socket_connector_release(connector);
+ }
+
+ async_socket_release(as);
+
return action;
}
@@ -974,7 +1050,11 @@ void
_on_async_socket_reconnect(void* opaque)
{
AsyncSocket* as = (AsyncSocket*)opaque;
+
+ /* Reference the socket while we're working with it in this callback. */
+ async_socket_reference(as);
async_socket_connect(as, as->reconnect_to);
+ async_socket_release(as);
}
@@ -1002,13 +1082,14 @@ async_socket_new(int port,
as->on_connection = client_cb;
as->readers_head = as->readers_tail = NULL;
as->reconnect_to = reconnect_to;
+ as->ref_count = 1;
sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
as->looper = looper_newCore();
if (as->looper == NULL) {
E("Unable to create I/O looper for async socket '%s'",
_async_socket_string(as));
client_cb(client_opaque, as, ASIO_STATE_FAILED);
- _async_socket_destroy(as);
+ _async_socket_free(as);
return NULL;
}
loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
@@ -1016,6 +1097,28 @@ async_socket_new(int port,
return as;
}
+int
+async_socket_reference(AsyncSocket* as)
+{
+ assert(as->ref_count > 0);
+ as->ref_count++;
+ return as->ref_count;
+}
+
+int
+async_socket_release(AsyncSocket* as)
+{
+ assert(as->ref_count > 0);
+ as->ref_count--;
+ if (as->ref_count == 0) {
+ /* Last reference has been dropped. Destroy this object. */
+ _async_socket_cancel_all_io(as);
+ _async_socket_free(as);
+ return 0;
+ }
+ return as->ref_count;
+}
+
void
async_socket_connect(AsyncSocket* as, int retry_to)
{
@@ -1034,7 +1137,6 @@ async_socket_disconnect(AsyncSocket* as)
if (as != NULL) {
_async_socket_cancel_all_io(as);
_async_socket_close_socket(as);
- _async_socket_destroy(as);
}
}
@@ -1056,6 +1158,8 @@ async_socket_read_abs(AsyncSocket* as,
AsyncSocketIO* const asr =
_async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
deadline);
+ /* Add new reader to the list. Note that we use initial reference from I/O
+ * 'new' routine as "in the list" reference counter. */
if (as->readers_head == NULL) {
as->readers_head = as->readers_tail = asr;
} else {
@@ -1087,6 +1191,8 @@ async_socket_write_abs(AsyncSocket* as,
AsyncSocketIO* const asw =
_async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
deadline);
+ /* Add new writer to the list. Note that we use initial reference from I/O
+ * 'new' routine as "in the list" reference counter. */
if (as->writers_head == NULL) {
as->writers_head = as->writers_tail = asw;
} else {
@@ -1120,3 +1226,9 @@ async_socket_deadline(AsyncSocket* as, int rel)
return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
DURATION_INFINITE;
}
+
+int
+async_socket_get_port(const AsyncSocket* as)
+{
+ return sock_address_get_port(&as->address);
+}
diff --git a/android/async-socket.h b/android/async-socket.h
index 1402126..661fae5 100644
--- a/android/async-socket.h
+++ b/android/async-socket.h
@@ -29,6 +29,11 @@
* Since all the operations (including connection) are asynchronous, all the
* operation results are reported back to the client of this API via set of
* callbacks that client supplied to this API.
+ *
+ * Since it's hard to control lifespan of an object in asynchronous environment,
+ * we make AsyncSocketConnector a referenced object, that will self-destruct when
+ * its reference count drops to zero, indicating that the last client has
+ * abandoned that object.
*/
/* Declares asynchronous socket descriptor. */
@@ -65,7 +70,26 @@ typedef AsyncIOAction (*on_as_io_cb)(void* io_opaque,
* AsyncSocketIO API
*******************************************************************************/
-/* Gets AsyncSocket instance for an I/O */
+/* References AsyncSocketIO object.
+ * Param:
+ * asio - Initialized AsyncSocketIO instance.
+ * Return:
+ * Number of outstanding references to the object.
+ */
+extern int async_socket_io_reference(AsyncSocketIO* asio);
+
+/* Releases AsyncSocketIO object.
+ * Note that upon exit from this routine the object might be destroyed, even if
+ * the routine returns value other than zero.
+ * Param:
+ * asio - Initialized AsyncSocketIO instance.
+ * Return:
+ * Number of outstanding references to the object.
+ */
+extern int async_socket_io_release(AsyncSocketIO* asio);
+
+/* Gets AsyncSocket instance for an I/O. Note that this routine will reference
+ * AsyncSocket instance before returning it to the caller. */
extern AsyncSocket* async_socket_io_get_socket(const AsyncSocketIO* asio);
/* Cancels time out set for an I/O */
@@ -121,6 +145,24 @@ extern AsyncSocket* async_socket_new(int port,
on_as_connection_cb connect_cb,
void* client_opaque);
+/* References AsyncSocket object.
+ * Param:
+ * as - Initialized AsyncSocket instance.
+ * Return:
+ * Number of outstanding references to the object.
+ */
+extern int async_socket_reference(AsyncSocket* as);
+
+/* Releases AsyncSocket object.
+ * Note that upon exit from this routine the object might be destroyed, even if
+ * the routine returns value other than zero.
+ * Param:
+ * as - Initialized AsyncSocket instance.
+ * Return:
+ * Number of outstanding references to the object.
+ */
+extern int async_socket_release(AsyncSocket* as);
+
/* Asynchronously connects to an asynchronous socket.
* Note that connection result will be reported via callback passed to the
* async_socket_new routine.
@@ -216,4 +258,7 @@ extern Duration async_socket_deadline(AsyncSocket* as, int rel);
/* Gets an opaque pointer associated with the socket's client */
extern void* async_socket_get_client_opaque(const AsyncSocket* as);
+/* Gets TCP port for the socket. */
+extern int async_socket_get_port(const AsyncSocket* as);
+
#endif /* ANDROID_ASYNC_SOCKET_H_ */
diff --git a/android/async-utils.c b/android/async-utils.c
index e410de0..9732111 100644
--- a/android/async-utils.c
+++ b/android/async-utils.c
@@ -268,3 +268,14 @@ asyncConnector_run(AsyncConnector* ac)
return ASYNC_COMPLETE;
}
}
+
+int
+asyncConnector_stop(AsyncConnector* ac)
+{
+ if (ac->state == CONNECT_CONNECTING) {
+ loopIo_dontWantWrite(ac->io);
+ ac->state = CONNECT_COMPLETED;
+ return 0;
+ }
+ return -1;
+}
diff --git a/android/async-utils.h b/android/async-utils.h
index 6d460c2..30dfbe3 100644
--- a/android/async-utils.h
+++ b/android/async-utils.h
@@ -225,4 +225,12 @@ asyncConnector_init(AsyncConnector* ac,
AsyncStatus
asyncConnector_run(AsyncConnector* ac);
+/* Stops connection in progress.
+ * Return:
+ * 0 if connection in progress has been stopped, or -1 if no connection has been
+ * in progress.
+ */
+int
+asyncConnector_stop(AsyncConnector* ac);
+
#endif /* ANDROID_ASYNC_UTILS_H */