diff options
-rw-r--r-- | adb/sysdeps_win32.c | 205 |
1 files changed, 190 insertions, 15 deletions
diff --git a/adb/sysdeps_win32.c b/adb/sysdeps_win32.c index ced91e8..c426718 100644 --- a/adb/sysdeps_win32.c +++ b/adb/sysdeps_win32.c @@ -352,7 +352,7 @@ int adb_open(const char* path, int options) return -1; } } - + snprintf( f->name, sizeof(f->name), "%d(%s)", _fh_to_int(f), path ); D( "adb_open: '%s' => fd %d\n", path, _fh_to_int(f) ); return _fh_to_int(f); @@ -837,7 +837,7 @@ static void bip_dump_hex( const unsigned char* ptr, size_t len ) if (len2 > 8) len2 = 8; - for (nn = 0; nn < len2; nn++) + for (nn = 0; nn < len2; nn++) printf("%02x", ptr[nn]); printf(" "); @@ -994,7 +994,7 @@ Exit: SetEvent( bip->evt_read ); } - BIPD(( "bip_buffer_write: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n", + BIPD(( "bip_buffer_write: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n", bip->fdin, bip->fdout, count, bip->a_start, bip->a_end, bip->b_end, bip->can_write, bip->can_read )); LeaveCriticalSection( &bip->lock ); @@ -1018,7 +1018,7 @@ bip_buffer_read( BipBuffer bip, void* dst, int len ) LeaveCriticalSection( &bip->lock ); errno = EAGAIN; return -1; -#else +#else int ret; LeaveCriticalSection( &bip->lock ); @@ -1087,14 +1087,14 @@ Exit: } BIPDUMP( (const unsigned char*)dst - count, count ); - BIPD(( "bip_buffer_read: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n", + BIPD(( "bip_buffer_read: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n", bip->fdin, bip->fdout, count, bip->a_start, bip->a_end, bip->b_end, bip->can_write, bip->can_read )); LeaveCriticalSection( &bip->lock ); return count; } -typedef struct SocketPairRec_ +typedef struct SocketPairRec_ { BipBufferRec a2b_bip; BipBufferRec b2a_bip; @@ -1400,7 +1400,7 @@ event_looper_hook( EventLooper looper, int fd, int events ) f->clazz->_fh_hook( f, events & ~node->wanted, node ); node->wanted |= events; } else { - D("event_looper_hook: ignoring events %x for %d wanted=%x)\n", + D("event_looper_hook: ignoring events %x for %d wanted=%x)\n", events, fd, node->wanted); } } @@ -1426,6 +1426,180 @@ event_looper_unhook( EventLooper looper, int fd, int events ) } } +/* + * A fixer for WaitForMultipleObjects on condition that there are more than 64 + * handles to wait on. + * + * In cetain cases DDMS may establish more than 64 connections with ADB. For + * instance, this may happen if there are more than 64 processes running on a + * device, or there are multiple devices connected (including the emulator) with + * the combined number of running processes greater than 64. In this case using + * WaitForMultipleObjects to wait on connection events simply wouldn't cut, + * because of the API limitations (64 handles max). So, we need to provide a way + * to scale WaitForMultipleObjects to accept an arbitrary number of handles. The + * easiest (and "Microsoft recommended") way to do that would be dividing the + * handle array into chunks with the chunk size less than 64, and fire up as many + * waiting threads as there are chunks. Then each thread would wait on a chunk of + * handles, and will report back to the caller which handle has been set. + * Here is the implementation of that algorithm. + */ + +/* Number of handles to wait on in each wating thread. */ +#define WAIT_ALL_CHUNK_SIZE 63 + +/* Descriptor for a wating thread */ +typedef struct WaitForAllParam { + /* A handle to an event to signal when waiting is over. This handle is shared + * accross all the waiting threads, so each waiting thread knows when any + * other thread has exited, so it can exit too. */ + HANDLE main_event; + /* Upon exit from a waiting thread contains the index of the handle that has + * been signaled. The index is an absolute index of the signaled handle in + * the original array. This pointer is shared accross all the waiting threads + * and it's not guaranteed (due to a race condition) that when all the + * waiting threads exit, the value contained here would indicate the first + * handle that was signaled. This is fine, because the caller cares only + * about any handle being signaled. It doesn't care about the order, nor + * about the whole list of handles that were signaled. */ + LONG volatile *signaled_index; + /* Array of handles to wait on in a waiting thread. */ + HANDLE* handles; + /* Number of handles in 'handles' array to wait on. */ + int handles_count; + /* Index inside the main array of the first handle in the 'handles' array. */ + int first_handle_index; + /* Waiting thread handle. */ + HANDLE thread; +} WaitForAllParam; + +/* Waiting thread routine. */ +static unsigned __stdcall +_in_waiter_thread(void* arg) +{ + HANDLE wait_on[WAIT_ALL_CHUNK_SIZE + 1]; + int res; + WaitForAllParam* const param = (WaitForAllParam*)arg; + + /* We have to wait on the main_event in order to be notified when any of the + * sibling threads is exiting. */ + wait_on[0] = param->main_event; + /* The rest of the handles go behind the main event handle. */ + memcpy(wait_on + 1, param->handles, param->handles_count * sizeof(HANDLE)); + + res = WaitForMultipleObjects(param->handles_count + 1, wait_on, FALSE, INFINITE); + if (res > 0 && res < (param->handles_count + 1)) { + /* One of the original handles got signaled. Save its absolute index into + * the output variable. */ + InterlockedCompareExchange(param->signaled_index, + res - 1L + param->first_handle_index, -1L); + } + + /* Notify the caller (and the siblings) that the wait is over. */ + SetEvent(param->main_event); + + _endthreadex(0); + return 0; +} + +/* WaitForMultipeObjects fixer routine. + * Param: + * handles Array of handles to wait on. + * handles_count Number of handles in the array. + * Return: + * (>= 0 && < handles_count) - Index of the signaled handle in the array, or + * WAIT_FAILED on an error. + */ +static int +_wait_for_all(HANDLE* handles, int handles_count) +{ + WaitForAllParam* threads; + HANDLE main_event; + int chunks, chunk, remains; + + /* This variable is going to be accessed by several threads at the same time, + * this is bound to fail randomly when the core is run on multi-core machines. + * To solve this, we need to do the following (1 _and_ 2): + * 1. Use the "volatile" qualifier to ensure the compiler doesn't optimize + * out the reads/writes in this function unexpectedly. + * 2. Ensure correct memory ordering. The "simple" way to do that is to wrap + * all accesses inside a critical section. But we can also use + * InterlockedCompareExchange() which always provide a full memory barrier + * on Win32. + */ + volatile LONG sig_index = -1; + + /* Calculate number of chunks, and allocate thread param array. */ + chunks = handles_count / WAIT_ALL_CHUNK_SIZE; + remains = handles_count % WAIT_ALL_CHUNK_SIZE; + threads = (WaitForAllParam*)malloc((chunks + (remains ? 1 : 0)) * + sizeof(WaitForAllParam)); + if (threads == NULL) { + D("Unable to allocate thread array for %d handles.", handles_count); + return (int)WAIT_FAILED; + } + + /* Create main event to wait on for all waiting threads. This is a "manualy + * reset" event that will remain set once it was set. */ + main_event = CreateEvent(NULL, TRUE, FALSE, NULL); + if (main_event == NULL) { + D("Unable to create main event. Error: %d", GetLastError()); + free(threads); + return (int)WAIT_FAILED; + } + + /* + * Initialize waiting thread parameters. + */ + + for (chunk = 0; chunk < chunks; chunk++) { + threads[chunk].main_event = main_event; + threads[chunk].signaled_index = &sig_index; + threads[chunk].first_handle_index = WAIT_ALL_CHUNK_SIZE * chunk; + threads[chunk].handles = handles + threads[chunk].first_handle_index; + threads[chunk].handles_count = WAIT_ALL_CHUNK_SIZE; + } + if (remains) { + threads[chunk].main_event = main_event; + threads[chunk].signaled_index = &sig_index; + threads[chunk].first_handle_index = WAIT_ALL_CHUNK_SIZE * chunk; + threads[chunk].handles = handles + threads[chunk].first_handle_index; + threads[chunk].handles_count = remains; + chunks++; + } + + /* Start the waiting threads. */ + for (chunk = 0; chunk < chunks; chunk++) { + /* Note that using adb_thread_create is not appropriate here, since we + * need a handle to wait on for thread termination. */ + threads[chunk].thread = (HANDLE)_beginthreadex(NULL, 0, _in_waiter_thread, + &threads[chunk], 0, NULL); + if (threads[chunk].thread == NULL) { + /* Unable to create a waiter thread. Collapse. */ + D("Unable to create a waiting thread %d of %d. errno=%d", + chunk, chunks, errno); + chunks = chunk; + SetEvent(main_event); + break; + } + } + + /* Wait on any of the threads to get signaled. */ + WaitForSingleObject(main_event, INFINITE); + + /* Wait on all the waiting threads to exit. */ + for (chunk = 0; chunk < chunks; chunk++) { + WaitForSingleObject(threads[chunk].thread, INFINITE); + CloseHandle(threads[chunk].thread); + } + + CloseHandle(main_event); + free(threads); + + + const int ret = (int)InterlockedCompareExchange(&sig_index, -1, -1); + return (ret >= 0) ? ret : (int)WAIT_FAILED; +} + static EventLooperRec win32_looper; static void fdevent_init(void) @@ -1494,7 +1668,7 @@ static void fdevent_process() { looper->htab_count = 0; - for (hook = looper->hooks; hook; hook = hook->next) + for (hook = looper->hooks; hook; hook = hook->next) { if (hook->start && !hook->start(hook)) { D( "fdevent_process: error when starting a hook\n" ); @@ -1525,10 +1699,11 @@ static void fdevent_process() D( "adb_win32: waiting for %d events\n", looper->htab_count ); if (looper->htab_count > MAXIMUM_WAIT_OBJECTS) { - D("handle count %d exceeds MAXIMUM_WAIT_OBJECTS, aborting!\n", looper->htab_count); - abort(); + D("handle count %d exceeds MAXIMUM_WAIT_OBJECTS.\n", looper->htab_count); + wait_ret = _wait_for_all(looper->htab, looper->htab_count); + } else { + wait_ret = WaitForMultipleObjects( looper->htab_count, looper->htab, FALSE, INFINITE ); } - wait_ret = WaitForMultipleObjects( looper->htab_count, looper->htab, FALSE, INFINITE ); if (wait_ret == (int)WAIT_FAILED) { D( "adb_win32: wait failed, error %ld\n", GetLastError() ); } else { @@ -1669,7 +1844,7 @@ void fdevent_destroy(fdevent *fde) fdevent_remove(fde); } -void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg) +void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg) { memset(fde, 0, sizeof(fdevent)); fde->state = FDE_ACTIVE; @@ -1691,7 +1866,7 @@ void fdevent_remove(fdevent *fde) if(fde->state & FDE_ACTIVE) { fdevent_disconnect(fde); - dump_fde(fde, "disconnect"); + dump_fde(fde, "disconnect"); fdevent_unregister(fde); } @@ -1917,7 +2092,7 @@ static void _event_socketpair_prepare( EventHook hook ) if (hook->wanted & FDE_READ && rbip->can_read) hook->ready |= FDE_READ; - if (hook->wanted & FDE_WRITE && wbip->can_write) + if (hook->wanted & FDE_WRITE && wbip->can_write) hook->ready |= FDE_WRITE; } @@ -1938,7 +2113,7 @@ static void _event_socketpair_prepare( EventHook hook ) D("_event_socketpair_start: can't handle FDE_READ+FDE_WRITE\n" ); return 0; } - D( "_event_socketpair_start: hook %s for %x wanted=%x\n", + D( "_event_socketpair_start: hook %s for %x wanted=%x\n", hook->fh->name, _fh_to_int(fh), hook->wanted); return 1; } |