diff options
Diffstat (limited to 'android/sdk-controller-socket.c')
-rw-r--r-- | android/sdk-controller-socket.c | 996 |
1 files changed, 753 insertions, 243 deletions
diff --git a/android/sdk-controller-socket.c b/android/sdk-controller-socket.c index a300169..8b0d813 100644 --- a/android/sdk-controller-socket.c +++ b/android/sdk-controller-socket.c @@ -20,8 +20,6 @@ * a TCP port forwarding, enabled by ADB. */ -#include "qemu-common.h" -#include "android/async-utils.h" #include "android/utils/debug.h" #include "android/async-socket-connector.h" #include "android/async-socket.h" @@ -34,7 +32,7 @@ #define D(...) VERBOSE_PRINT(sdkctlsocket,__VA_ARGS__) #define D_ACTIVE VERBOSE_CHECK(sdkctlsocket) -#define TRACE_ON 1 +#define TRACE_ON 0 #if TRACE_ON #define T(...) VERBOSE_PRINT(sdkctlsocket,__VA_ARGS__) @@ -53,12 +51,8 @@ struct SDKCtlRecycled { }; }; -/******************************************************************************** - * SDKCtlPacket declarations - *******************************************************************************/ - /* - * Types of the packets of data sent via SDK controller socket. + * Types of the data packets sent via SDK controller socket. */ /* The packet is a message. */ @@ -68,12 +62,56 @@ struct SDKCtlRecycled { /* The packet is a response to a query. */ #define SDKCTL_PACKET_QUERY_RESPONSE 3 +/* + * Types of intenal port messages sent via SDK controller socket. + */ + +/* Port is connected. + * This message is sent by SDK controller when the service connects a socket with + * a port that provides requested emulation functionality. + */ +#define SDKCTL_MSG_PORT_CONNECTED -1 +/* Port is disconnected. + * This message is sent by SDK controller when a port that provides requested + * emulation functionality disconnects from the socket. + */ +#define SDKCTL_MSG_PORT_DISCONNECTED -2 +/* Port is enabled. + * This message is sent by SDK controller when a port that provides requested + * emulation functionality is ready to do the emulation. + */ +#define SDKCTL_MSG_PORT_ENABLED -3 +/* Port is disabled. + * This message is sent by SDK controller when a port that provides requested + * emulation functionality is not ready to do the emulation. + */ +#define SDKCTL_MSG_PORT_DISABLED -4 + +/* + * Types of internal queries sent via SDK controller socket. + */ + +/* Handshake query. + * This query is sent to SDK controller service as part of the connection + * protocol implementation. + */ +#define SDKCTL_QUERY_HANDSHAKE -1 + +/******************************************************************************** + * SDKCtlPacket declarations + *******************************************************************************/ + +/* Packet signature value ('SDKC'). */ +static const int _sdkctl_packet_sig = 0x53444B43; + /* Data packet descriptor. * * All packets, sent and received via SDK controller socket begin with this * header, with packet data immediately following this header. */ typedef struct SDKCtlPacketHeader { + /* Signature. */ + int signature; /* Total size of the data to transfer with this packet, including this * header. The transferring data should immediatelly follow this header. */ int size; @@ -83,42 +121,60 @@ typedef struct SDKCtlPacketHeader { } SDKCtlPacketHeader; /* Packet descriptor, allocated by this API for data packets to be sent to SDK - * controller service on the device. + * controller. * * When packet descriptors are allocated by this API, they are allocated large * enough to contain this header, and packet data to send to the service, * immediately following this descriptor. */ -struct SDKCtlPacket { +typedef struct SDKCtlPacket { /* Supports recycling. Don't put anything in front: recycler expects this * to be the first field in recyclable descriptor. */ - SDKCtlRecycled recycling; + SDKCtlRecycled recycling; - /* Next packet in the list of packets to send. */ - SDKCtlPacket* next; /* SDK controller socket that transmits this packet. */ - SDKCtlSocket* sdkctl; + SDKCtlSocket* sdkctl; /* Number of outstanding references to the packet. */ - int ref_count; + int ref_count; /* Common packet header. Packet data immediately follows this header, so it - * must be last field in SDKCtlPacket descriptor. */ - SDKCtlPacketHeader header; -}; + * must be the last field in SDKCtlPacket descriptor. */ + SDKCtlPacketHeader header; +} SDKCtlPacket; /******************************************************************************** - * SDKCtlQuery declarations + * SDKCtlDirectPacket declarations *******************************************************************************/ -/* - * Types of queries sent via SDK controller socket. +/* Direct packet descriptor, allocated by this API for direct data packets to be + * sent to SDK controller service on the device. + * + * Direct packet (unlike SDKCtlPacket) don't contain data buffer, but rather + * reference data allocated by the client. This is useful when client sends large + * amount of data (such as framebuffer updates sent my multi-touch port), and + * regular packet descriptors for such large transfer cannot be obtained from the + * recycler. */ +struct SDKCtlDirectPacket { + /* Supports recycling. Don't put anything in front: recycler expects this + * to be the first field in recyclable descriptor. */ + SDKCtlRecycled recycling; -/* Handshake query. - * This query is sent to SDK controller service as part of the connection - * protocol implementation. - */ -#define SDKCTL_QUERY_HANDSHAKE -1 + /* SDKCtlSocket that owns this packet. */ + SDKCtlSocket* sdkctl; + /* Packet to send. */ + SDKCtlPacketHeader* packet; + /* Callback to invoke on packet transmission events. */ + on_sdkctl_direct_cb on_sent; + /* An opaque pointer to pass to on_sent callback. */ + void* on_sent_opaque; + /* Number of outstanding references to the packet. */ + int ref_count; +}; + +/******************************************************************************** + * SDKCtlQuery declarations + *******************************************************************************/ /* Query packet descriptor. * @@ -131,8 +187,7 @@ typedef struct SDKCtlQueryHeader { /* A unique query identifier. This ID is used to track the query in the * asynchronous environment in whcih SDK controller socket operates. */ int query_id; - /* Query type. See SDKCTL_QUERY_XXX for the list of query types used by SDK - * controller. */ + /* Query type. */ int query_type; } SDKCtlQueryHeader; @@ -148,12 +203,12 @@ struct SDKCtlQuery { * to be the first field in recyclable descriptor. */ SDKCtlRecycled recycling; - /* Next query in the list of active, or recycled queries. */ + /* Next query in the list of active queries. */ SDKCtlQuery* next; /* A timer to run time out on this query after it has been sent. */ LoopTimer timer[1]; /* Absolute time for this query's deadline. This is the value that query's - * timer is set for after query has been transmitted to the service. */ + * timer is set to after query has been transmitted to the service. */ Duration deadline; /* SDK controller socket that owns the query. */ SDKCtlSocket* sdkctl; @@ -163,20 +218,21 @@ struct SDKCtlQuery { void* query_opaque; /* Points to an address of a buffer where to save query response. */ void** response_buffer; - /* Points to a variable containing size of the response buffer (on the way in), - * or actual query response size (when query is completed). */ + /* Points to a variable containing size of the response buffer (on the way + * in), or actual query response size (when query is completed). */ uint32_t* response_size; /* Internal response buffer, allocated if query creator didn't provide its * own. This field is valid only if response_buffer field is NULL, or is * pointing to this field. */ void* internal_resp_buffer; - /* Internal response buffer size This field is valid only if response_size - * field is NULL, or is pointing to this field. */ + /* Internal response buffer size used if query creator didn't provide its + * own. This field is valid only if response_size field is NULL, or is + * pointing to this field. */ uint32_t internal_resp_size; /* Number of outstanding references to the query. */ int ref_count; - /* Common packet header. Query data immediately follows this header, so it + /* Common query header. Query data immediately follows this header, so it * must be last field in SDKCtlQuery descriptor. */ SDKCtlQueryHeader header; }; @@ -195,6 +251,34 @@ typedef struct SDKCtlQueryReplyHeader { } SDKCtlQueryReplyHeader; /******************************************************************************** + * SDKCtlMessage declarations + *******************************************************************************/ + +/* Message packet descriptor. + * + * All messages, sent and received via SDK controller socket begin with this + * header, with message data immediately following this header. + */ +typedef struct SDKCtlMessageHeader { + /* Data packet header for this query. */ + SDKCtlPacketHeader packet; + /* Message type. */ + int msg_type; +} SDKCtlMessageHeader; + +/* Message packet descriptor. + * + * All messages, sent and received via SDK controller socket begin with this + * header, with message data immediately following this header. + */ +struct SDKCtlMessage { + /* Data packet descriptor for this message. */ + SDKCtlPacket packet; + /* Message type. */ + int msg_type; +}; + +/******************************************************************************** * SDK Control Socket declarations *******************************************************************************/ @@ -229,13 +313,15 @@ typedef struct SDKCtlIODispatcher { /* Unites all types of headers used in SDK controller data exchange. */ union { /* Common packet header. */ - SDKCtlPacketHeader header; + SDKCtlPacketHeader packet_header; /* Header for a query packet. */ SDKCtlQueryHeader query_header; + /* Header for a message packet. */ + SDKCtlMessageHeader message_header; /* Header for a query response packet. */ SDKCtlQueryReplyHeader query_reply_header; }; - /* Descriptor of a packet packet received from SDK controller. */ + /* Descriptor of a packet that is being received from SDK controller. */ SDKCtlPacket* packet; /* A query for which a reply is currently being received. */ SDKCtlQuery* current_query; @@ -244,42 +330,43 @@ typedef struct SDKCtlIODispatcher { /* SDK controller socket descriptor. */ struct SDKCtlSocket { /* SDK controller socket state */ - SDKCtlSocketState state; + SDKCtlSocketState state; + /* SDK controller port status */ + SdkCtlPortStatus port_status; /* I/O dispatcher for the socket. */ - SDKCtlIODispatcher io_dispatcher; + SDKCtlIODispatcher io_dispatcher; /* Asynchronous socket connected to SDK Controller on the device. */ - AsyncSocket* as; + AsyncSocket* as; /* Client callback that monitors this socket connection. */ - on_sdkctl_connection_cb on_connection; - /* A callback to invoke when handshake message is received from the - * SDK controller. */ - on_sdkctl_handshake_cb on_handshake; + on_sdkctl_socket_connection_cb on_socket_connection; + /* Client callback that monitors SDK controller prt connection. */ + on_sdkctl_port_connection_cb on_port_connection; /* A callback to invoke when a message is received from the SDK controller. */ - on_sdkctl_message_cb on_message; + on_sdkctl_message_cb on_message; /* An opaque pointer associated with this socket. */ - void* opaque; - /* Name of an SDK controller service this socket is connected to. */ - char* service_name; + void* opaque; + /* Name of an SDK controller port this socket is connected to. */ + char* service_name; /* I/O looper for timers. */ - Looper* looper; + Looper* looper; /* Head of the active query list. */ - SDKCtlQuery* query_head; + SDKCtlQuery* query_head; /* Tail of the active query list. */ - SDKCtlQuery* query_tail; + SDKCtlQuery* query_tail; /* Query ID generator that gets incremented for each new query. */ - int next_query_id; + int next_query_id; /* Timeout before trying to reconnect after disconnection. */ - int reconnect_to; + int reconnect_to; /* Number of outstanding references to this descriptor. */ - int ref_count; + int ref_count; /* Head of the recycled memory */ - SDKCtlRecycled* recycler; + SDKCtlRecycled* recycler; /* Recyclable block size. */ - uint32_t recycler_block_size; + uint32_t recycler_block_size; /* Maximum number of blocks to recycle. */ - int recycler_max; + int recycler_max; /* Number of blocs in the recycler. */ - int recycler_count; + int recycler_count; }; /******************************************************************************** @@ -294,16 +381,18 @@ _sdkctl_socket_alloc_recycler(SDKCtlSocket* sdkctl, uint32_t size) SDKCtlRecycled* block = NULL; if (sdkctl->recycler != NULL && size <= sdkctl->recycler_block_size) { + assert(sdkctl->recycler_count > 0); /* There are blocks in the recycler, and requested size fits. */ block = sdkctl->recycler; sdkctl->recycler = block->next; block->size = sdkctl->recycler_block_size; sdkctl->recycler_count--; } else if (size <= sdkctl->recycler_block_size) { - /* There are no blocks in the recycler, but requested size fits. */ + /* There are no blocks in the recycler, but requested size fits. Lets + * allocate block that we can later recycle. */ block = malloc(sdkctl->recycler_block_size); if (block == NULL) { - APANIC("SDKCtl %s: Unable to allocate %d bytes block", + APANIC("SDKCtl %s: Unable to allocate %d bytes block.", sdkctl->service_name, sdkctl->recycler_block_size); } block->size = sdkctl->recycler_block_size; @@ -324,18 +413,19 @@ _sdkctl_socket_alloc_recycler(SDKCtlSocket* sdkctl, uint32_t size) static void _sdkctl_socket_free_recycler(SDKCtlSocket* sdkctl, void* mem) { - SDKCtlRecycled* block = (SDKCtlRecycled*)mem; + SDKCtlRecycled* const block = (SDKCtlRecycled*)mem; - if (sdkctl->recycler_count == sdkctl->recycler_max || - block->size != sdkctl->recycler_block_size) { - /* Recycler is full, or block cannot be recycled. */ + if (block->size != sdkctl->recycler_block_size || + sdkctl->recycler_count == sdkctl->recycler_max) { + /* Recycler is full, or block cannot be recycled. Just free the memory. */ free(mem); - return; + } else { + /* Add that block to the recycler. */ + assert(sdkctl->recycler_count >= 0); + block->next = sdkctl->recycler; + sdkctl->recycler = block; + sdkctl->recycler_count++; } - - block->next = sdkctl->recycler; - sdkctl->recycler = block; - sdkctl->recycler_count++; } /* Empties the recycler for a given SDKCtlSocket. */ @@ -344,7 +434,7 @@ _sdkctl_socket_empty_recycler(SDKCtlSocket* sdkctl) { SDKCtlRecycled* block = sdkctl->recycler; while (block != NULL) { - void* to_free = block; + void* const to_free = block; block = block->next; free(to_free); } @@ -366,6 +456,7 @@ _sdkctl_socket_add_query(SDKCtlQuery* query) { SDKCtlSocket* const sdkctl = query->sdkctl; if (sdkctl->query_head == NULL) { + assert(sdkctl->query_tail == NULL); sdkctl->query_head = sdkctl->query_tail = query; } else { sdkctl->query_tail->next = query; @@ -378,7 +469,9 @@ _sdkctl_socket_add_query(SDKCtlQuery* query) /* Removes a query from the list of active queries. * Param: - * query - Query to remove from the list of active queries. + * query - Query to remove from the list of active queries. If query has been + * removed from the list, it will be dereferenced to offset the reference + * that wad made when the query has been added to the list. * Return: * Boolean: 1 if query has been removed, or 0 if query has not been found in the * list of active queries. @@ -390,11 +483,11 @@ _sdkctl_socket_remove_query(SDKCtlQuery* query) SDKCtlQuery* prev = NULL; SDKCtlQuery* head = sdkctl->query_head; - /* Quick check: the query could be currently handled by dispatcher. */ + /* Quick check: the query could be currently handled by the dispatcher. */ if (sdkctl->io_dispatcher.current_query == query) { /* Release the query from dispatcher. */ - sdkctl_query_release(query); sdkctl->io_dispatcher.current_query = NULL; + sdkctl_query_release(query); return 1; } @@ -403,32 +496,34 @@ _sdkctl_socket_remove_query(SDKCtlQuery* query) prev = head; head = head->next; } - if (head != NULL) { - if (prev == NULL) { - /* Query is at the head of the list. */ - assert(query == sdkctl->query_head); - sdkctl->query_head = query->next; - } else { - /* Query is in the middle / at the end of the list. */ - assert(query != sdkctl->query_head); - prev->next = query->next; - } - if (sdkctl->query_tail == query) { - /* Query is at the tail of the list. */ - assert(query->next == NULL); - sdkctl->query_tail = prev; - } - query->next = NULL; + if (head == NULL) { + D("SDKCtl %s: Query %p is not found in the list.", + sdkctl->service_name, query); + return 0; + } - /* Release query that is now removed from the list. Note that query - * passed to this routine should hold an extra reference, owned by the - * caller. */ - sdkctl_query_release(query); - return 1; + if (prev == NULL) { + /* Query is at the head of the list. */ + assert(query == sdkctl->query_head); + sdkctl->query_head = query->next; } else { - D("%s: Query %p is not found in the list.", sdkctl->service_name, query); - return 0; + /* Query is in the middle / at the end of the list. */ + assert(query != sdkctl->query_head); + prev->next = query->next; + } + if (sdkctl->query_tail == query) { + /* Query is at the tail of the list. */ + assert(query->next == NULL); + sdkctl->query_tail = prev; } + query->next = NULL; + + /* Release query that is now removed from the list. Note that query + * passed to this routine should hold an extra reference, owned by the + * caller. */ + sdkctl_query_release(query); + + return 1; } /* Removes a query (based on query ID) from the list of active queries. @@ -437,11 +532,14 @@ _sdkctl_socket_remove_query(SDKCtlQuery* query) * query_id - Identifies the query to remove. * Return: * A query removed from the list of active queries, or NULL if query with the - * given ID has not been found in the list. + * given ID has not been found in the list. Note that query returned from this + * routine still holds the reference made when the query has been added to the + * list. */ static SDKCtlQuery* _sdkctl_socket_remove_query_id(SDKCtlSocket* sdkctl, int query_id) { + SDKCtlQuery* query = NULL; SDKCtlQuery* prev = NULL; SDKCtlQuery* head = sdkctl->query_head; @@ -449,7 +547,7 @@ _sdkctl_socket_remove_query_id(SDKCtlSocket* sdkctl, int query_id) if (sdkctl->io_dispatcher.current_query != NULL && sdkctl->io_dispatcher.current_query->header.query_id == query_id) { /* Release the query from dispatcher. */ - SDKCtlQuery* const query = sdkctl->io_dispatcher.current_query; + query = sdkctl->io_dispatcher.current_query; sdkctl->io_dispatcher.current_query = NULL; return query; } @@ -459,37 +557,38 @@ _sdkctl_socket_remove_query_id(SDKCtlSocket* sdkctl, int query_id) prev = head; head = head->next; } - if (head != NULL) { - /* Query is found in the list. */ - SDKCtlQuery* const query = head; - if (prev == NULL) { - /* Query is at the head of the list. */ - assert(query == sdkctl->query_head); - sdkctl->query_head = query->next; - } else { - /* Query is in the middle, or at the end of the list. */ - assert(query != sdkctl->query_head); - prev->next = query->next; - } - if (sdkctl->query_tail == query) { - /* Query is at the tail of the list. */ - assert(query->next == NULL); - sdkctl->query_tail = prev; - } - query->next = NULL; - return query; - } else { - D("%s: Query ID %d is not found in the list.", + if (head == NULL) { + D("SDKCtl %s: Query ID %d is not found in the list.", sdkctl->service_name, query_id); return NULL; } + + /* Query is found in the list. */ + query = head; + if (prev == NULL) { + /* Query is at the head of the list. */ + assert(query == sdkctl->query_head); + sdkctl->query_head = query->next; + } else { + /* Query is in the middle, or at the end of the list. */ + assert(query != sdkctl->query_head); + prev->next = query->next; + } + if (sdkctl->query_tail == query) { + /* Query is at the tail of the list. */ + assert(query->next == NULL); + sdkctl->query_tail = prev; + } + query->next = NULL; + + return query; } /* Pulls the first query from the list of active queries. * Param: * sdkctl - SDKCtlSocket instance that owns the query. * Return: - * A query removed pulled from the list of active queries, or NULL if query + * A query removed from the head of the list of active queries, or NULL if query * list is empty. */ static SDKCtlQuery* @@ -519,19 +618,24 @@ _sdkctl_socket_next_query_id(SDKCtlSocket* sdkctl) /* Alocates a packet. */ static SDKCtlPacket* -_sdkctl_packet_new(SDKCtlSocket* sdkctl, int size, int type) +_sdkctl_packet_new(SDKCtlSocket* sdkctl, uint32_t size, int type) { - const uint32_t total_size = sizeof(SDKCtlPacket) + size; - SDKCtlPacket* const packet = _sdkctl_socket_alloc_recycler(sdkctl, total_size); + /* Allocate packet descriptor large enough to contain packet data. */ + SDKCtlPacket* const packet = + _sdkctl_socket_alloc_recycler(sdkctl, sizeof(SDKCtlPacket) + size); - packet->sdkctl = sdkctl; - packet->ref_count = 1; - packet->header.size = size; - packet->header.type = type; + packet->sdkctl = sdkctl; + packet->ref_count = 1; + packet->header.signature = _sdkctl_packet_sig; + packet->header.size = size; + packet->header.type = type; /* Refence SDKCTlSocket that owns this packet. */ sdkctl_socket_reference(sdkctl); + T("SDKCtl %s: Packet %p of type %d is allocated for %d bytes transfer.", + sdkctl->service_name, packet, type, size); + return packet; } @@ -541,23 +645,27 @@ _sdkctl_packet_free(SDKCtlPacket* packet) { SDKCtlSocket* const sdkctl = packet->sdkctl; - /* Free allocated resources. */ + /* Recycle packet. */ _sdkctl_socket_free_recycler(packet->sdkctl, packet); + T("SDKCtl %s: Packet %p is freed.", sdkctl->service_name, packet); + /* Release SDKCTlSocket that owned this packet. */ sdkctl_socket_release(sdkctl); } +/* References a packet. */ int -sdkctl_packet_reference(SDKCtlPacket* packet) +_sdkctl_packet_reference(SDKCtlPacket* packet) { assert(packet->ref_count > 0); packet->ref_count++; return packet->ref_count; } +/* Releases a packet. */ int -sdkctl_packet_release(SDKCtlPacket* packet) +_sdkctl_packet_release(SDKCtlPacket* packet) { assert(packet->ref_count > 0); packet->ref_count--; @@ -569,6 +677,272 @@ sdkctl_packet_release(SDKCtlPacket* packet) return packet->ref_count; } +/* An I/O callback invoked on packet transmission. + * Param: + * io_opaque SDKCtlPacket instance of the packet that's being sent with this I/O. + * asio - Write I/O descriptor. + * status - I/O status. + */ +static AsyncIOAction +_on_sdkctl_packet_send_io(void* io_opaque, + AsyncSocketIO* asio, + AsyncIOState status) +{ + SDKCtlPacket* const packet = (SDKCtlPacket*)io_opaque; + AsyncIOAction action = ASIO_ACTION_DONE; + + /* Reference the packet while we're in this callback. */ + _sdkctl_packet_reference(packet); + + /* Lets see what's going on with query transmission. */ + switch (status) { + case ASIO_STATE_SUCCEEDED: + /* Packet has been sent to the service. */ + T("SDKCtl %s: Packet %p transmission has succeeded.", + packet->sdkctl->service_name, packet); + break; + + case ASIO_STATE_CANCELLED: + T("SDKCtl %s: Packet %p is cancelled.", + packet->sdkctl->service_name, packet); + break; + + case ASIO_STATE_FAILED: + T("SDKCtl %s: Packet %p has failed: %d -> %s", + packet->sdkctl->service_name, packet, errno, strerror(errno)); + break; + + case ASIO_STATE_FINISHED: + /* Time to disassociate the packet with the I/O. */ + _sdkctl_packet_release(packet); + break; + + default: + /* Transitional state. */ + break; + } + + _sdkctl_packet_release(packet); + + return action; +} + +/* Transmits a packet to SDK Controller. + * Param: + * packet - Packet to transmit. + */ +static void +_sdkctl_packet_transmit(SDKCtlPacket* packet) +{ + assert(packet->header.signature == _sdkctl_packet_sig); + + /* Reference to associate with the I/O */ + _sdkctl_packet_reference(packet); + + /* Transmit the packet to SDK controller. */ + async_socket_write_rel(packet->sdkctl->as, &packet->header, packet->header.size, + _on_sdkctl_packet_send_io, packet, -1); + + T("SDKCtl %s: Packet %p size %d is being sent.", + packet->sdkctl->service_name, packet, packet->header.size); +} + +/******************************************************************************** + * SDKCtlDirectPacket implementation + ********************************************************************************/ + +SDKCtlDirectPacket* +sdkctl_direct_packet_new(SDKCtlSocket* sdkctl) +{ + SDKCtlDirectPacket* const packet = + _sdkctl_socket_alloc_recycler(sdkctl, sizeof(SDKCtlDirectPacket)); + + packet->sdkctl = sdkctl; + packet->ref_count = 1; + + /* Refence SDKCTlSocket that owns this packet. */ + sdkctl_socket_reference(packet->sdkctl); + + T("SDKCtl %s: Direct packet %p is allocated.", sdkctl->service_name, packet); + + return packet; +} + +/* Frees a direct packet. */ +static void +_sdkctl_direct_packet_free(SDKCtlDirectPacket* packet) +{ + SDKCtlSocket* const sdkctl = packet->sdkctl; + + /* Free allocated resources. */ + _sdkctl_socket_free_recycler(packet->sdkctl, packet); + + T("SDKCtl %s: Direct packet %p is freed.", sdkctl->service_name, packet); + + /* Release SDKCTlSocket that owned this packet. */ + sdkctl_socket_release(sdkctl); +} + +/* References a packet. */ +int +sdkctl_direct_packet_reference(SDKCtlDirectPacket* packet) +{ + assert(packet->ref_count > 0); + packet->ref_count++; + return packet->ref_count; +} + +/* Releases a packet. */ +int +sdkctl_direct_packet_release(SDKCtlDirectPacket* packet) +{ + assert(packet->ref_count > 0); + packet->ref_count--; + if (packet->ref_count == 0) { + /* Last reference has been dropped. Destroy this object. */ + _sdkctl_direct_packet_free(packet); + return 0; + } + return packet->ref_count; +} + +/* An I/O callback invoked on direct packet transmission. + * Param: + * io_opaque SDKCtlDirectPacket instance of the packet that's being sent with + * this I/O. + * asio - Write I/O descriptor. + * status - I/O status. + */ +static AsyncIOAction +_on_sdkctl_direct_packet_send_io(void* io_opaque, + AsyncSocketIO* asio, + AsyncIOState status) +{ + SDKCtlDirectPacket* const packet = (SDKCtlDirectPacket*)io_opaque; + AsyncIOAction action = ASIO_ACTION_DONE; + + /* Reference the packet while we're in this callback. */ + sdkctl_direct_packet_reference(packet); + + /* Lets see what's going on with query transmission. */ + switch (status) { + case ASIO_STATE_SUCCEEDED: + /* Packet has been sent to the service. */ + T("SDKCtl %s: Direct packet %p transmission has succeeded.", + packet->sdkctl->service_name, packet); + packet->on_sent(packet->on_sent_opaque, packet, status); + break; + + case ASIO_STATE_CANCELLED: + T("SDKCtl %s: Direct packet %p is cancelled.", + packet->sdkctl->service_name, packet); + packet->on_sent(packet->on_sent_opaque, packet, status); + break; + + case ASIO_STATE_FAILED: + T("SDKCtl %s: Direct packet %p has failed: %d -> %s", + packet->sdkctl->service_name, packet, errno, strerror(errno)); + packet->on_sent(packet->on_sent_opaque, packet, status); + break; + + case ASIO_STATE_FINISHED: + /* Time to disassociate with the I/O. */ + sdkctl_direct_packet_release(packet); + break; + + default: + /* Transitional state. */ + break; + } + + sdkctl_direct_packet_release(packet); + + return action; +} + +void +sdkctl_direct_packet_send(SDKCtlDirectPacket* packet, + void* data, + on_sdkctl_direct_cb cb, + void* cb_opaque) +{ + packet->packet = (SDKCtlPacketHeader*)data; + packet->on_sent = cb; + packet->on_sent_opaque = cb_opaque; + assert(packet->packet->signature == _sdkctl_packet_sig); + + /* Reference for I/O */ + sdkctl_direct_packet_reference(packet); + + /* Transmit the packet to SDK controller. */ + async_socket_write_rel(packet->sdkctl->as, packet->packet, packet->packet->size, + _on_sdkctl_direct_packet_send_io, packet, -1); + + T("SDKCtl %s: Direct packet %p size %d is being sent", + packet->sdkctl->service_name, packet, packet->packet->size); +} + +/******************************************************************************** + * SDKCtlMessage implementation + *******************************************************************************/ + +/* Alocates a message descriptor. */ +static SDKCtlMessage* +_sdkctl_message_new(SDKCtlSocket* sdkctl, uint32_t msg_size, int msg_type) +{ + SDKCtlMessage* const msg = + (SDKCtlMessage*)_sdkctl_packet_new(sdkctl, + sizeof(SDKCtlMessageHeader) + msg_size, + SDKCTL_PACKET_MESSAGE); + msg->msg_type = msg_type; + + return msg; +} + +int +sdkctl_message_reference(SDKCtlMessage* msg) +{ + return _sdkctl_packet_reference(&msg->packet); +} + +int +sdkctl_message_release(SDKCtlMessage* msg) +{ + return _sdkctl_packet_release(&msg->packet); +} + +SDKCtlMessage* +sdkctl_message_send(SDKCtlSocket* sdkctl, + int msg_type, + const void* data, + uint32_t size) +{ + SDKCtlMessage* const msg = _sdkctl_message_new(sdkctl, size, msg_type); + if (size != 0 && data != NULL) { + memcpy(msg + 1, data, size); + } + _sdkctl_packet_transmit(&msg->packet); + + return msg; +} + +int +sdkctl_message_get_header_size(void) +{ + return sizeof(SDKCtlMessageHeader); +} + +void +sdkctl_init_message_header(void* msg, int msg_type, int msg_size) +{ + SDKCtlMessageHeader* const msg_header = (SDKCtlMessageHeader*)msg; + + msg_header->packet.signature = _sdkctl_packet_sig; + msg_header->packet.size = sizeof(SDKCtlMessageHeader) + msg_size; + msg_header->packet.type = SDKCTL_PACKET_MESSAGE; + msg_header->msg_type = msg_type; +} + /******************************************************************************** * SDKCtlQuery implementation *******************************************************************************/ @@ -579,19 +953,21 @@ _sdkctl_query_free(SDKCtlQuery* query) { if (query != NULL) { SDKCtlSocket* const sdkctl = query->sdkctl; - T("SDKCtl %s: Query %p ID %d is freed.", - sdkctl->service_name, query, query->header.query_id); - - /* Free allocated resources. */ if (query->internal_resp_buffer != NULL && (query->response_buffer == NULL || query->response_buffer == &query->internal_resp_buffer)) { + /* This query used its internal buffer to receive the response. + * Free it. */ free(query->internal_resp_buffer); } loopTimer_done(query->timer); + + /* Recyle the descriptor. */ _sdkctl_socket_free_recycler(sdkctl, query); + T("SDKCtl %s: Query %p is freed.", sdkctl->service_name, query); + /* Release socket that owned this query. */ sdkctl_socket_release(sdkctl); } @@ -609,8 +985,8 @@ _sdkctl_query_cancel_timeout(SDKCtlQuery* query) { loopTimer_stop(query->timer); - T("SDKCtl %s: Query %p ID %d deadline is cancelled.", - query->sdkctl->service_name, query, query->header.query_id); + T("SDKCtl %s: Query %p ID %d deadline %lld is cancelled.", + query->sdkctl->service_name, query, query->header.query_id, query->deadline); } /* @@ -639,7 +1015,7 @@ _on_sdkctl_query_cancelled(SDKCtlQuery* query) * Query cancellation means that SDK controller is disconnected. In turn, * this means that SDK controller socket will handle disconnection in its * connection callback. So, at this point all we need to do here is to inform - * the client, and then unlist the query. + * the client about query cancellation. */ /* Cancel deadline, and inform the client about query cancellation. */ @@ -676,12 +1052,11 @@ _on_skdctl_query_timeout(void* opaque) sdkctl_query_release(query); } -/* A callback that is invoked when query has been sent to the SDK controller - * service. */ +/* A callback that is invoked when query has been sent to the SDK controller. */ static void _on_sdkctl_query_sent(SDKCtlQuery* query) { - T("SDKCtl %s: sent %d bytes of query %p ID %d of type %d", + T("SDKCtl %s: Sent %d bytes of query %p ID %d of type %d", query->sdkctl->service_name, query->header.packet.size, query, query->header.query_id, query->header.query_type); @@ -710,31 +1085,25 @@ _on_sdkctl_query_send_io(void* io_opaque, /* Reference the query while we're in this callback. */ sdkctl_query_reference(query); - if (status == ASIO_STATE_SUCCEEDED) { - /* Query has been sent to the service. */ - _on_sdkctl_query_sent(query); - - sdkctl_query_release(query); - - return ASIO_ACTION_DONE; - } - /* Lets see what's going on with query transmission. */ switch (status) { + case ASIO_STATE_SUCCEEDED: + /* Query has been sent to the service. */ + _on_sdkctl_query_sent(query); + break; + case ASIO_STATE_CANCELLED: - T("SDKCtl %s: Query %p ID %d is cancelled in %s I/O.", - query->sdkctl->service_name, query, query->header.query_id, - async_socket_io_is_read(asio) ? "READ" : "WRITE"); + T("SDKCtl %s: Query %p ID %d is cancelled in transmission.", + query->sdkctl->service_name, query, query->header.query_id); /* Remove the query from the list of active queries. */ _sdkctl_socket_remove_query(query); _on_sdkctl_query_cancelled(query); break; case ASIO_STATE_TIMED_OUT: - D("SDKCtl %s: Query %p ID %d with deadline %lld has timed out in %s I/O at %lld", + D("SDKCtl %s: Query %p ID %d with deadline %lld has timed out in transmission at %lld", query->sdkctl->service_name, query, query->header.query_id, - query->deadline, async_socket_io_is_read(asio) ? "READ" : "WRITE", - async_socket_deadline(query->sdkctl->as, 0)); + query->deadline, async_socket_deadline(query->sdkctl->as, 0)); /* Invoke query's callback. */ action = query->query_cb(query->query_opaque, query, status); /* For actions other than retry we need to stop the query. */ @@ -744,9 +1113,8 @@ _on_sdkctl_query_send_io(void* io_opaque, break; case ASIO_STATE_FAILED: - T("SDKCtl %s: Query %p ID %d failed in %s I/O: %d -> %s", + T("SDKCtl %s: Query %p ID %d failed in transmission: %d -> %s", query->sdkctl->service_name, query, query->header.query_id, - async_socket_io_is_read(asio) ? "READ" : "WRITE", errno, strerror(errno)); /* Invoke query's callback. Note that we will let the client to * decide what to do on I/O failure. */ @@ -779,23 +1147,23 @@ _on_sdkctl_query_send_io(void* io_opaque, SDKCtlQuery* sdkctl_query_new(SDKCtlSocket* sdkctl, int query_type, uint32_t in_data_size) { - const uint32_t total_size = sizeof(SDKCtlQuery) + in_data_size; - - SDKCtlQuery* const query = _sdkctl_socket_alloc_recycler(sdkctl, total_size); - query->next = NULL; - query->sdkctl = sdkctl; - query->response_buffer = NULL; - query->response_size = NULL; - query->internal_resp_buffer = NULL; - query->internal_resp_size = 0; - query->query_cb = NULL; - query->query_opaque = NULL; - query->deadline = DURATION_INFINITE; - query->ref_count = 1; - query->header.packet.size = sizeof(SDKCtlQueryHeader) + in_data_size; - query->header.packet.type = SDKCTL_PACKET_QUERY; - query->header.query_id = _sdkctl_socket_next_query_id(sdkctl); - query->header.query_type = query_type; + SDKCtlQuery* const query = + _sdkctl_socket_alloc_recycler(sdkctl, sizeof(SDKCtlQuery) + in_data_size); + query->next = NULL; + query->sdkctl = sdkctl; + query->response_buffer = NULL; + query->response_size = NULL; + query->internal_resp_buffer = NULL; + query->internal_resp_size = 0; + query->query_cb = NULL; + query->query_opaque = NULL; + query->deadline = DURATION_INFINITE; + query->ref_count = 1; + query->header.packet.signature = _sdkctl_packet_sig; + query->header.packet.size = sizeof(SDKCtlQueryHeader) + in_data_size; + query->header.packet.type = SDKCTL_PACKET_QUERY; + query->header.query_id = _sdkctl_socket_next_query_id(sdkctl); + query->header.query_type = query_type; /* Initialize timer to fire up on query deadline expiration. */ loopTimer_init(query->timer, sdkctl->looper, _on_skdctl_query_timeout, query); @@ -822,21 +1190,19 @@ sdkctl_query_new_ex(SDKCtlSocket* sdkctl, { SDKCtlQuery* const query = sdkctl_query_new(sdkctl, query_type, in_data_size); - query->response_buffer = response_buffer; + query->response_buffer = response_buffer; if (query->response_buffer == NULL) { /* Creator didn't supply a buffer. Use internal one instead. */ query->response_buffer = &query->internal_resp_buffer; - query->internal_resp_buffer = NULL; } - query->response_size = response_size; + query->response_size = response_size; if (query->response_size == NULL) { /* Creator didn't supply a buffer for response size. Use internal one * instead. */ query->response_size = &query->internal_resp_size; - query->internal_resp_size = 0; } - query->query_cb = query_cb; - query->query_opaque = query_opaque; + query->query_cb = query_cb; + query->query_opaque = query_opaque; /* Init query's input buffer. */ if (in_data_size != 0 && in_data != NULL) { memcpy(query + 1, in_data, in_data_size); @@ -859,11 +1225,12 @@ sdkctl_query_send(SDKCtlQuery* query, int to) /* Reference query associated with write I/O. */ sdkctl_query_reference(query); + assert(query->header.packet.signature == _sdkctl_packet_sig); /* Transmit the query to SDK controller. */ async_socket_write_abs(sdkctl->as, &query->header, query->header.packet.size, _on_sdkctl_query_send_io, query, query->deadline); - T("SDKCtl %s: Query %p ID %d type %d is sent with deadline at %lld", + T("SDKCtl %s: Query %p ID %d type %d is being sent with deadline at %lld", query->sdkctl->service_name, query, query->header.query_id, query->header.query_type, query->deadline); } @@ -908,20 +1275,73 @@ sdkctl_query_release(SDKCtlQuery* query) return query->ref_count; } +void* +sdkctl_query_get_buffer_in(SDKCtlQuery* query) +{ + /* Query buffer starts right after the header. */ + return query + 1; +} + +void* +sdkctl_query_get_buffer_out(SDKCtlQuery* query) +{ + return query->response_buffer != NULL ? *query->response_buffer : + query->internal_resp_buffer; +} + /******************************************************************************** * SDKCtlPacket implementation *******************************************************************************/ -/* A packet has been received from SDK controller. */ +/* A packet has been received from SDK controller. + * Note that we expect the packet to be a message, since queries, and query + * replies are handled separately. */ static void _on_sdkctl_packet_received(SDKCtlSocket* sdkctl, SDKCtlPacket* packet) { T("SDKCtl %s: Received packet size: %d, type: %d", sdkctl->service_name, packet->header.size, packet->header.type); - /* Dispatch received packet to the client. */ - sdkctl->on_message(sdkctl->opaque, sdkctl, packet, packet->header.type, - packet + 1, packet->header.size - sizeof(SDKCtlPacketHeader)); + assert(packet->header.signature == _sdkctl_packet_sig); + if (packet->header.type == SDKCTL_PACKET_MESSAGE) { + SDKCtlMessage* const msg = (SDKCtlMessage*)packet; + /* Lets see if this is an internal protocol message. */ + switch (msg->msg_type) { + case SDKCTL_MSG_PORT_CONNECTED: + sdkctl->port_status = SDKCTL_PORT_CONNECTED; + sdkctl->on_port_connection(sdkctl->opaque, sdkctl, + SDKCTL_PORT_CONNECTED); + break; + + case SDKCTL_MSG_PORT_DISCONNECTED: + sdkctl->port_status = SDKCTL_PORT_DISCONNECTED; + sdkctl->on_port_connection(sdkctl->opaque, sdkctl, + SDKCTL_PORT_DISCONNECTED); + break; + + case SDKCTL_MSG_PORT_ENABLED: + sdkctl->port_status = SDKCTL_PORT_ENABLED; + sdkctl->on_port_connection(sdkctl->opaque, sdkctl, + SDKCTL_PORT_ENABLED); + break; + + case SDKCTL_MSG_PORT_DISABLED: + sdkctl->port_status = SDKCTL_PORT_DISABLED; + sdkctl->on_port_connection(sdkctl->opaque, sdkctl, + SDKCTL_PORT_DISABLED); + break; + + default: + /* This is a higher-level message. Dispatch the message to the + * client. */ + sdkctl->on_message(sdkctl->opaque, sdkctl, msg, msg->msg_type, msg + 1, + packet->header.size - sizeof(SDKCtlMessageHeader)); + break; + } + } else { + E("SDKCtl %s: Received unknown packet type %d size %d", + sdkctl->service_name, packet->header.type, packet->header.size); + } } /******************************************************************************** @@ -949,7 +1369,7 @@ _sdkctl_io_dispatcher_start(SDKCtlSocket* sdkctl) { dispatcher->current_query = NULL; /* Register a packet header reader with the socket. */ - async_socket_read_rel(dispatcher->sdkctl->as, &dispatcher->header, + async_socket_read_rel(dispatcher->sdkctl->as, &dispatcher->packet_header, sizeof(SDKCtlPacketHeader), _on_sdkctl_io_dispatcher_io, dispatcher, -1); } @@ -969,12 +1389,14 @@ _sdkctl_io_dispatcher_reset(SDKCtlSocket* sdkctl) { /* Free packet data buffer. */ if (dispatcher->packet != NULL) { - sdkctl_packet_release(dispatcher->packet); + _sdkctl_packet_release(dispatcher->packet); dispatcher->packet = NULL; } /* Reset dispatcher state. */ dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER; + + T("SDKCtl %s: I/O Dispatcher is reset", sdkctl->service_name); } /* @@ -999,7 +1421,7 @@ _on_io_dispatcher_io_failure(SDKCtlIODispatcher* dispatcher, /* Report disconnection to the client, and let it restore connection in this * callback. */ - sdkctl->on_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED); + sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED); } /* A callback that is invoked when dispatcher's reader has been cancelled. */ @@ -1020,7 +1442,7 @@ _on_io_dispatcher_io_cancelled(SDKCtlIODispatcher* dispatcher, /* Discard packet data we've received so far. */ if (dispatcher->packet != NULL) { - sdkctl_packet_release(dispatcher->packet); + _sdkctl_packet_release(dispatcher->packet); dispatcher->packet = NULL; } } @@ -1033,8 +1455,20 @@ _on_io_dispatcher_packet_header(SDKCtlIODispatcher* dispatcher, SDKCtlSocket* const sdkctl = dispatcher->sdkctl; T("SDKCtl %s: Packet header type %d, size %d is received.", - dispatcher->sdkctl->service_name, dispatcher->header.type, - dispatcher->header.size); + dispatcher->sdkctl->service_name, dispatcher->packet_header.type, + dispatcher->packet_header.size); + + /* Make sure we have a valid packet header. */ + if (dispatcher->packet_header.signature != _sdkctl_packet_sig) { + E("SDKCtl %s: Invalid packet signature %x for packet type %d, size %d", + sdkctl->service_name, dispatcher->packet_header.signature, + dispatcher->packet_header.type, dispatcher->packet_header.size); + /* This is a protocol failure. Treat it as I/O failure: disconnect, and + * let the client to decide what to do next. */ + errno = EINVAL; + _on_io_dispatcher_io_failure(dispatcher, asio); + return ASIO_ACTION_DONE; + } /* Here we have three choices for the packet, that define the rest of * the data that follow it: @@ -1044,7 +1478,7 @@ _on_io_dispatcher_packet_header(SDKCtlIODispatcher* dispatcher, * Update the state accordingly, and initiate reading of the * remaining of the packet. */ - if (dispatcher->header.type == SDKCTL_PACKET_QUERY_RESPONSE) { + if (dispatcher->packet_header.type == SDKCTL_PACKET_QUERY_RESPONSE) { /* This is a response to the query. Before receiving response data we * need to locate the relevant query, and use its response buffer to read * the data. For that we need to obtain query ID firts. So, initiate @@ -1059,11 +1493,11 @@ _on_io_dispatcher_packet_header(SDKCtlIODispatcher* dispatcher, * there. */ dispatcher->state = SDKCTL_IODISP_EXPECT_DATA; dispatcher->packet = - _sdkctl_packet_new(sdkctl, dispatcher->header.size, - dispatcher->header.type); + _sdkctl_packet_new(sdkctl, dispatcher->packet_header.size, + dispatcher->packet_header.type); /* Initiate reading of the packet data. */ async_socket_read_rel(sdkctl->as, dispatcher->packet + 1, - dispatcher->header.size - sizeof(SDKCtlPacketHeader), + dispatcher->packet_header.size - sizeof(SDKCtlPacketHeader), _on_sdkctl_io_dispatcher_io, dispatcher, -1); } @@ -1075,18 +1509,19 @@ static AsyncIOAction _on_io_dispatcher_packet(SDKCtlIODispatcher* dispatcher, AsyncSocketIO* asio) { SDKCtlSocket* const sdkctl = dispatcher->sdkctl; + SDKCtlPacket* const packet = dispatcher->packet; + dispatcher->packet = NULL; T("SDKCtl %s: Packet type %d, size %d is received.", - dispatcher->sdkctl->service_name, dispatcher->header.type, - dispatcher->header.size); + dispatcher->sdkctl->service_name, dispatcher->packet_header.type, + dispatcher->packet_header.size); - _on_sdkctl_packet_received(sdkctl, dispatcher->packet); - sdkctl_packet_release(dispatcher->packet); - dispatcher->packet = NULL; + _on_sdkctl_packet_received(sdkctl, packet); + _sdkctl_packet_release(packet); /* Get ready for the next I/O cycle. */ dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER; - async_socket_read_rel(sdkctl->as, &dispatcher->header, sizeof(SDKCtlPacketHeader), + async_socket_read_rel(sdkctl->as, &dispatcher->packet_header, sizeof(SDKCtlPacketHeader), _on_sdkctl_io_dispatcher_io, dispatcher, -1); return ASIO_ACTION_DONE; } @@ -1107,6 +1542,9 @@ _on_io_dispatcher_query_reply_header(SDKCtlIODispatcher* dispatcher, dispatcher->current_query = _sdkctl_socket_remove_query_id(sdkctl, dispatcher->query_reply_header.query_id); query = dispatcher->current_query; + const uint32_t query_data_size = + dispatcher->packet_header.size - sizeof(SDKCtlQueryReplyHeader); + dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA; if (query == NULL) { D("%s: Query #%d is not found by dispatcher", @@ -1116,13 +1554,12 @@ _on_io_dispatcher_query_reply_header(SDKCtlIODispatcher* dispatcher, * and then discard when it's over. */ dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA; dispatcher->packet = - _sdkctl_packet_new(sdkctl, dispatcher->header.size, - dispatcher->header.type); + _sdkctl_packet_new(sdkctl, dispatcher->packet_header.size, + dispatcher->packet_header.type); /* Copy query reply info to the packet. */ memcpy(&dispatcher->packet->header, &dispatcher->query_reply_header, sizeof(SDKCtlQueryReplyHeader)); - async_socket_read_rel(sdkctl->as, &dispatcher->query_header + 1, - dispatcher->header.size - sizeof(SDKCtlQueryReplyHeader), + async_socket_read_rel(sdkctl->as, dispatcher->packet + 1, query_data_size, _on_sdkctl_io_dispatcher_io, dispatcher, -1); } else { /* Prepare to receive query reply. For the simplicity sake, cancel query @@ -1130,9 +1567,6 @@ _on_io_dispatcher_query_reply_header(SDKCtlIODispatcher* dispatcher, * receiving query's reply. */ _sdkctl_query_cancel_timeout(query); - /* Adjust the reply buffer set for the query (if needed). */ - const uint32_t query_data_size = - dispatcher->header.size - sizeof(SDKCtlQueryReplyHeader); if (*query->response_size < query_data_size) { *query->response_buffer = malloc(query_data_size); if (*query->response_buffer == NULL) { @@ -1144,7 +1578,6 @@ _on_io_dispatcher_query_reply_header(SDKCtlIODispatcher* dispatcher, *query->response_size = query_data_size; /* Start reading query response. */ - dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA; async_socket_read_rel(sdkctl->as, *query->response_buffer, *query->response_size, _on_sdkctl_io_dispatcher_io, dispatcher, -1); @@ -1176,14 +1609,14 @@ _on_io_dispatcher_query_reply(SDKCtlIODispatcher* dispatcher, AsyncSocketIO* asi /* This was "read up in the air" for a cancelled query. Just discard the * read data. */ if (dispatcher->packet != NULL) { - sdkctl_packet_release(dispatcher->packet); + _sdkctl_packet_release(dispatcher->packet); dispatcher->packet = NULL; } } /* Get ready for the next I/O cycle. */ dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER; - async_socket_read_rel(sdkctl->as, &dispatcher->header, sizeof(SDKCtlPacketHeader), + async_socket_read_rel(sdkctl->as, &dispatcher->packet_header, sizeof(SDKCtlPacketHeader), _on_sdkctl_io_dispatcher_io, dispatcher, -1); return ASIO_ACTION_DONE; } @@ -1351,6 +1784,7 @@ _sdkctl_socket_disconnect_socket(SDKCtlSocket* sdkctl) } sdkctl->state = SDKCTL_SOCKET_DISCONNECTED; + sdkctl->port_status = SDKCTL_PORT_DISCONNECTED; } /* Frees SDKCtlSocket instance. */ @@ -1358,6 +1792,8 @@ static void _sdkctl_socket_free(SDKCtlSocket* sdkctl) { if (sdkctl != NULL) { + T("SDKCtl %s: descriptor is destroing.", sdkctl->service_name); + /* Disconnect, and release the socket. */ if (sdkctl->as != NULL) { async_socket_disconnect(sdkctl->as); @@ -1394,7 +1830,7 @@ _on_async_socket_connected(SDKCtlSocket* sdkctl) /* Notify the client that connection is established. */ const AsyncIOAction action = - sdkctl->on_connection(sdkctl->opaque, sdkctl, ASIO_STATE_SUCCEEDED); + sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, ASIO_STATE_SUCCEEDED); if (action == ASIO_ACTION_DONE) { /* Initialize, and start main I/O dispatcher. */ @@ -1418,8 +1854,8 @@ _on_async_socket_disconnected(SDKCtlSocket* sdkctl) _sdkctl_socket_disconnect_socket(sdkctl); - AsyncIOAction action = sdkctl->on_connection(sdkctl->opaque, sdkctl, - ASIO_STATE_FAILED); + AsyncIOAction action = sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, + ASIO_STATE_FAILED); if (action == ASIO_ACTION_DONE) { /* Default action for disconnect is to reestablish the connection. */ action = ASIO_ACTION_RETRY; @@ -1481,35 +1917,38 @@ _on_async_socket_connection(void* client_opaque, SDKCtlSocket* sdkctl_socket_new(int reconnect_to, const char* service_name, - on_sdkctl_connection_cb on_connection, - on_sdkctl_handshake_cb on_handshake, + on_sdkctl_socket_connection_cb on_socket_connection, + on_sdkctl_port_connection_cb on_port_connection, on_sdkctl_message_cb on_message, void* opaque) { SDKCtlSocket* sdkctl; ANEW0(sdkctl); - sdkctl->state = SDKCTL_SOCKET_DISCONNECTED; - sdkctl->opaque = opaque; - sdkctl->service_name = ASTRDUP(service_name); - sdkctl->on_connection = on_connection; - sdkctl->on_handshake = on_handshake; - sdkctl->on_message = on_message; - sdkctl->reconnect_to = reconnect_to; - sdkctl->as = NULL; - sdkctl->next_query_id = 0; - sdkctl->query_head = sdkctl->query_tail = NULL; - sdkctl->ref_count = 1; - sdkctl->recycler = NULL; - sdkctl->recycler_block_size = 0; - sdkctl->recycler_max = 0; - sdkctl->recycler_count = 0; + sdkctl->state = SDKCTL_SOCKET_DISCONNECTED; + sdkctl->port_status = SDKCTL_PORT_DISCONNECTED; + sdkctl->opaque = opaque; + sdkctl->service_name = ASTRDUP(service_name); + sdkctl->on_socket_connection = on_socket_connection; + sdkctl->on_port_connection = on_port_connection; + sdkctl->on_message = on_message; + sdkctl->reconnect_to = reconnect_to; + sdkctl->as = NULL; + sdkctl->next_query_id = 0; + sdkctl->query_head = sdkctl->query_tail = NULL; + sdkctl->ref_count = 1; + sdkctl->recycler = NULL; + sdkctl->recycler_block_size = 0; + sdkctl->recycler_max = 0; + sdkctl->recycler_count = 0; + + T("SDKCtl %s: descriptor is created.", sdkctl->service_name); sdkctl->looper = looper_newCore(); if (sdkctl->looper == NULL) { E("Unable to create I/O looper for SDKCtl socket '%s'", service_name); - on_connection(opaque, sdkctl, ASIO_STATE_FAILED); + on_socket_connection(opaque, sdkctl, ASIO_STATE_FAILED); _sdkctl_socket_free(sdkctl); return NULL; } @@ -1569,7 +2008,7 @@ sdkctl_socket_connect(SDKCtlSocket* sdkctl, int port, int retry_to) if (sdkctl->as == NULL) { E("Unable to allocate AsyncSocket for SDKCtl socket '%s'", sdkctl->service_name); - sdkctl->on_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED); + sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED); } else { async_socket_connect(sdkctl->as, retry_to); } @@ -1599,11 +2038,54 @@ sdkctl_socket_disconnect(SDKCtlSocket* sdkctl) _sdkctl_socket_disconnect_socket(sdkctl); } +int +sdkctl_socket_is_connected(SDKCtlSocket* sdkctl) +{ + return (sdkctl->state == SDKCTL_SOCKET_CONNECTED) ? 1 : 0; +} + +int +sdkctl_socket_is_port_ready(SDKCtlSocket* sdkctl) +{ + return (sdkctl->port_status == SDKCTL_PORT_ENABLED) ? 1 : 0; +} + +SdkCtlPortStatus +sdkctl_socket_get_port_status(SDKCtlSocket* sdkctl) +{ + return sdkctl->port_status; +} + +int +sdkctl_socket_is_handshake_ok(SDKCtlSocket* sdkctl) +{ + switch (sdkctl->port_status) { + case SDKCTL_HANDSHAKE_DUP: + case SDKCTL_HANDSHAKE_UNKNOWN_QUERY: + case SDKCTL_HANDSHAKE_UNKNOWN_RESPONSE: + return 0; + default: + return 1; + } +} /******************************************************************************** * Handshake query *******************************************************************************/ +/* + * Handshake result values. + */ + +/* Handshake has succeeded completed, and service-side port is connected. */ +#define SDKCTL_HANDSHAKE_RESP_CONNECTED 0 +/* Handshake has succeeded completed, but service-side port is not connected. */ +#define SDKCTL_HANDSHAKE_RESP_NOPORT 1 +/* Handshake has failed due to duplicate connection request. */ +#define SDKCTL_HANDSHAKE_RESP_DUP -1 +/* Handshake has failed due to unknown query. */ +#define SDKCTL_HANDSHAKE_RESP_QUERY_UNKNOWN -2 + /* A callback that is ivoked on handshake I/O events. */ static AsyncIOAction _on_handshake_io(void* query_opaque, @@ -1613,12 +2095,41 @@ _on_handshake_io(void* query_opaque, SDKCtlSocket* const sdkctl = (SDKCtlSocket*)query_opaque; if (status == ASIO_STATE_SUCCEEDED) { - D("SDKCtl %s: %d bytes of handshake reply is received.", - sdkctl->service_name, *query->response_size); + const int* res = (const int*)(*query->response_buffer); + SdkCtlPortStatus handshake_status; + switch (*res) { + case SDKCTL_HANDSHAKE_RESP_CONNECTED: + D("SDKCtl %s: Handshake succeeded. Port is connected", + sdkctl->service_name); + handshake_status = SDKCTL_HANDSHAKE_CONNECTED; + break; + + case SDKCTL_HANDSHAKE_RESP_NOPORT: + D("SDKCtl %s: Handshake succeeded. Port is not connected", + sdkctl->service_name); + handshake_status = SDKCTL_HANDSHAKE_NO_PORT; + break; + + case SDKCTL_HANDSHAKE_RESP_DUP: + D("SDKCtl %s: Handshake failed: duplicate connection.", + sdkctl->service_name); + handshake_status = SDKCTL_HANDSHAKE_DUP; + break; - /* Handshake is received. Inform the client. */ - sdkctl->on_handshake(sdkctl->opaque, sdkctl, *query->response_buffer, - *query->response_size, status); + case SDKCTL_HANDSHAKE_RESP_QUERY_UNKNOWN: + D("SDKCtl %s: Handshake failed: unknown query.", + sdkctl->service_name); + handshake_status = SDKCTL_HANDSHAKE_UNKNOWN_QUERY; + break; + + default: + E("SDKCtl %s: Unknown handshake response: %d", + sdkctl->service_name, *res); + handshake_status = SDKCTL_HANDSHAKE_UNKNOWN_RESPONSE; + break; + } + sdkctl->port_status = handshake_status; + sdkctl->on_port_connection(sdkctl->opaque, sdkctl, handshake_status); } else { /* Something is going on with the handshake... */ switch (status) { @@ -1627,9 +2138,8 @@ _on_handshake_io(void* query_opaque, case ASIO_STATE_CANCELLED: D("SDKCtl %s: Handshake failed: I/O state %d. Error: %d -> %s", sdkctl->service_name, status, errno, strerror(errno)); - sdkctl->on_handshake(sdkctl->opaque, sdkctl, - *query->response_buffer, - *query->response_size, status); + sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, + ASIO_STATE_FAILED); break; default: @@ -1663,7 +2173,7 @@ _on_sdkctl_endianness_io(void* io_opaque, case ASIO_STATE_CANCELLED: D("SDKCtl %s: endianness failed: I/O state %d. Error: %d -> %s", sdkctl->service_name, status, errno, strerror(errno)); - sdkctl->on_handshake(sdkctl->opaque, sdkctl, NULL, 0, status); + sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED); break; default: @@ -1682,7 +2192,7 @@ static const char _host_end = 0; static const char _host_end = 1; #endif - D("SDKCtl %s: Sending endianness: %d...", sdkctl->service_name, _host_end); + D("SDKCtl %s: Sending endianness: %d", sdkctl->service_name, _host_end); /* Before we can send any structured data to the SDK controller we need to * report endianness of the host. */ |