diff options
author | Aaron Whyte <awhyte@google.com> | 2013-10-28 17:18:06 -0700 |
---|---|---|
committer | Mike Lockwood <lockwood@google.com> | 2013-11-14 11:24:51 -0800 |
commit | 92863c14b7d36f74ec715b45ca6adc8bf95dc87c (patch) | |
tree | 2cf98925df54b3a6f064404cb3de0b4e36e05801 | |
parent | ab6ec384c456022f37a9c6183d3afbcefcb436a9 (diff) | |
download | hardware_libhardware-92863c14b7d36f74ec715b45ca6adc8bf95dc87c.zip hardware_libhardware-92863c14b7d36f74ec715b45ca6adc8bf95dc87c.tar.gz hardware_libhardware-92863c14b7d36f74ec715b45ca6adc8bf95dc87c.tar.bz2 |
MultiHal multithreaded polling
Change-Id: I3ebe380169eed1c8deeca2860d1788be6c14837e
-rw-r--r-- | modules/sensors/SensorEventQueue.cpp | 48 | ||||
-rw-r--r-- | modules/sensors/SensorEventQueue.h | 9 | ||||
-rw-r--r-- | modules/sensors/multihal.cpp | 158 | ||||
-rw-r--r-- | modules/sensors/tests/SensorEventQueue_test.cpp | 88 |
4 files changed, 146 insertions, 157 deletions
diff --git a/modules/sensors/SensorEventQueue.cpp b/modules/sensors/SensorEventQueue.cpp index c139944..00013de 100644 --- a/modules/sensors/SensorEventQueue.cpp +++ b/modules/sensors/SensorEventQueue.cpp @@ -17,55 +17,28 @@ #include <hardware/sensors.h> #include <algorithm> #include <pthread.h> - #include <linux/input.h> -#include <cutils/atomic.h> #include <cutils/log.h> #include "SensorEventQueue.h" SensorEventQueue::SensorEventQueue(int capacity) { mCapacity = capacity; + mStart = 0; mSize = 0; mData = new sensors_event_t[mCapacity]; - pthread_cond_init(&mDataAvailableCondition, NULL); pthread_cond_init(&mSpaceAvailableCondition, NULL); - pthread_mutex_init(&mMutex, NULL); } SensorEventQueue::~SensorEventQueue() { delete[] mData; mData = NULL; - pthread_cond_destroy(&mDataAvailableCondition); pthread_cond_destroy(&mSpaceAvailableCondition); - pthread_mutex_destroy(&mMutex); -} - -void SensorEventQueue::lock() { - pthread_mutex_lock(&mMutex); -} - -void SensorEventQueue::unlock() { - pthread_mutex_unlock(&mMutex); -} - -void SensorEventQueue::waitForSpaceAndLock() { - lock(); - while (mSize >= mCapacity) { - pthread_cond_wait(&mSpaceAvailableCondition, &mMutex); - } -} - -void SensorEventQueue::waitForDataAndLock() { - lock(); - while (mSize <= 0) { - pthread_cond_wait(&mDataAvailableCondition, &mMutex); - } } int SensorEventQueue::getWritableRegion(int requestedLength, sensors_event_t** out) { - if (mSize >= mCapacity || requestedLength <= 0) { + if (mSize == mCapacity || requestedLength <= 0) { *out = NULL; return 0; } @@ -88,9 +61,6 @@ int SensorEventQueue::getWritableRegion(int requestedLength, sensors_event_t** o void SensorEventQueue::markAsWritten(int count) { mSize += count; - if (mSize) { - pthread_cond_broadcast(&mDataAvailableCondition); - } } int SensorEventQueue::getSize() { @@ -98,13 +68,21 @@ int SensorEventQueue::getSize() { } sensors_event_t* SensorEventQueue::peek() { - if (mSize <= 0) return NULL; + if (mSize == 0) return NULL; return &mData[mStart]; } void SensorEventQueue::dequeue() { - if (mSize <= 0) return; + if (mSize == 0) return; + if (mSize == mCapacity) { + pthread_cond_broadcast(&mSpaceAvailableCondition); + } mSize--; mStart = (mStart + 1) % mCapacity; - pthread_cond_broadcast(&mSpaceAvailableCondition); +} + +void SensorEventQueue::waitForSpace(pthread_mutex_t* mutex) { + while (mSize == mCapacity) { + pthread_cond_wait(&mSpaceAvailableCondition, mutex); + } } diff --git a/modules/sensors/SensorEventQueue.h b/modules/sensors/SensorEventQueue.h index fd833fa..969d018 100644 --- a/modules/sensors/SensorEventQueue.h +++ b/modules/sensors/SensorEventQueue.h @@ -35,17 +35,11 @@ class SensorEventQueue { int mStart; // start of readable region int mSize; // number of readable items sensors_event_t* mData; - pthread_cond_t mDataAvailableCondition; pthread_cond_t mSpaceAvailableCondition; - pthread_mutex_t mMutex; public: SensorEventQueue(int capacity); ~SensorEventQueue(); - void lock(); - void unlock(); - void waitForSpaceAndLock(); - void waitForDataAndLock(); // Returns length of region, between zero and min(capacity, requestedLength). If there is any // writable space, it will return a region of at least one. Because it must return @@ -73,6 +67,9 @@ public: // This will decrease the size by one, freeing up the oldest readable event's slot for writing. // Only call while holding the lock. void dequeue(); + + // Blocks until space is available. No-op if there is already space. + void waitForSpace(pthread_mutex_t* mutex); }; #endif // SENSOREVENTQUEUE_H_ diff --git a/modules/sensors/multihal.cpp b/modules/sensors/multihal.cpp index 45099b5..52588c7 100644 --- a/modules/sensors/multihal.cpp +++ b/modules/sensors/multihal.cpp @@ -30,6 +30,7 @@ #include <stdio.h> #include <dlfcn.h> +#include <SensorEventQueue.h> // comment out to disable debug-level logging #define LOG_NDEBUG 0 @@ -41,6 +42,13 @@ static const int MAX_CONF_LINE_LENGTH = 1024; static pthread_mutex_t init_modules_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t init_sensors_mutex = PTHREAD_MUTEX_INITIALIZER; +// This mutex is shared by all queues +static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; + +// Used to pause the multihal poll(). Broadcasted by sub-polling tasks if waiting_for_data. +static pthread_cond_t data_available_cond = PTHREAD_COND_INITIALIZER; +bool waiting_for_data = false; + /* * Vector of sub modules, whose indexes are referred to ni this file as module_index. */ @@ -65,7 +73,7 @@ struct FullHandle { return localHandle < that.localHandle; } - bool operator=(const FullHandle &that) const { + bool operator==(const FullHandle &that) const { return moduleIndex == that.moduleIndex && localHandle == that.localHandle; } }; @@ -75,13 +83,12 @@ std::map<FullHandle, int> full_to_global; int next_global_handle = 1; static int assign_global_handle(int module_index, int local_handle) { - ALOGD("assign_global_handle %d %d", module_index, local_handle); int global_handle = next_global_handle++; - FullHandle *full_handle = new FullHandle(); - full_handle->moduleIndex = module_index; - full_handle->localHandle = local_handle; - full_to_global[*full_handle] = global_handle; - global_to_full[global_handle] = *full_handle; + FullHandle full_handle; + full_handle.moduleIndex = module_index; + full_handle.localHandle = local_handle; + full_to_global[full_handle] = global_handle; + global_to_full[global_handle] = full_handle; return global_handle; } @@ -90,12 +97,53 @@ static int get_local_handle(int global_handle) { } static int get_module_index(int global_handle) { - ALOGD("get_module_index %d", global_handle); + ALOGD("get_module_index for global_handle %d", global_handle); FullHandle f = global_to_full[global_handle]; ALOGD("FullHandle moduleIndex %d, localHandle %d", f.moduleIndex, f.localHandle); return f.moduleIndex; } +static const int SENSOR_EVENT_QUEUE_CAPACITY = 20; + +struct TaskContext { + sensors_poll_device_t* device; + SensorEventQueue* queue; +}; + +void *writerTask(void* ptr) { + ALOGD("writerTask STARTS"); + TaskContext* ctx = (TaskContext*)ptr; + sensors_poll_device_t* device = ctx->device; + SensorEventQueue* queue = ctx->queue; + sensors_event_t* buffer; + int eventsPolled; + while (1) { + ALOGD("writerTask before lock 1"); + pthread_mutex_lock(&queue_mutex); + ALOGD("writerTask before waitForSpace"); + queue->waitForSpace(&queue_mutex); + ALOGD("writerTask after waitForSpace"); + int bufferSize = queue->getWritableRegion(SENSOR_EVENT_QUEUE_CAPACITY, &buffer); + // Do blocking poll outside of lock + pthread_mutex_unlock(&queue_mutex); + + ALOGD("writerTask before poll() - bufferSize = %d", bufferSize); + eventsPolled = device->poll(device, buffer, bufferSize); + ALOGD("writerTask poll() got %d events.", eventsPolled); + + ALOGD("writerTask before lock 2"); + pthread_mutex_lock(&queue_mutex); + queue->markAsWritten(eventsPolled); + ALOGD("writerTask wrote %d events", eventsPolled); + if (waiting_for_data) { + ALOGD("writerTask - broadcast data_available_cond"); + pthread_cond_broadcast(&data_available_cond); + } + pthread_mutex_unlock(&queue_mutex); + } + // never actually returns + return NULL; +} /* * Cache of all sensors, with original handles replaced by global handles. @@ -124,15 +172,31 @@ struct sensors_poll_context_t { int close(); std::vector<hw_device_t*> sub_hw_devices; + std::vector<SensorEventQueue*> queues; + std::vector<pthread_t> threads; + int nextReadIndex; sensors_poll_device_t* get_v0_device_by_handle(int global_handle); sensors_poll_device_1_t* get_v1_device_by_handle(int global_handle); int get_device_version_by_handle(int global_handle); + + void copy_event_remap_handle(sensors_event_t* src, sensors_event_t* dest, int sub_index); }; void sensors_poll_context_t::addSubHwDevice(struct hw_device_t* sub_hw_device) { ALOGD("addSubHwDevice"); this->sub_hw_devices.push_back(sub_hw_device); + + SensorEventQueue *queue = new SensorEventQueue(SENSOR_EVENT_QUEUE_CAPACITY); + this->queues.push_back(queue); + + TaskContext* taskContext = new TaskContext(); + taskContext->device = (sensors_poll_device_t*) sub_hw_device; + taskContext->queue = queue; + + pthread_t writerThread; + pthread_create(&writerThread, NULL, writerTask, taskContext); + this->threads.push_back(writerThread); } sensors_poll_device_t* sensors_poll_context_t::get_v0_device_by_handle(int handle) { @@ -168,34 +232,60 @@ int sensors_poll_context_t::setDelay(int handle, int64_t ns) { return retval; } -int sensors_poll_context_t::poll(sensors_event_t *data, int count) { - ALOGD("poll"); - - // This only gets the first device. Parallel polling of multiple devices is coming soon. - int sub_index = 0; - sensors_poll_device_t* v0 = (sensors_poll_device_t*) this->sub_hw_devices[sub_index]; - - ALOGD("poll's blocking read begins..."); - int retval = v0->poll(v0, data, count); - ALOGD("...poll's blocking read ends"); - ALOGD("rewriting %d sensor handles...", retval); - // A normal event's "sensor" field is a local handles. Convert it to a global handle. +void sensors_poll_context_t::copy_event_remap_handle(sensors_event_t* dest, sensors_event_t* src, + int sub_index) { + memcpy(dest, src, sizeof(struct sensors_event_t)); + // A normal event's "sensor" field is a local handle. Convert it to a global handle. // A meta-data event must have its sensor set to 0, but it has a nested event // with a local handle that needs to be converted to a global handle. FullHandle full_handle; full_handle.moduleIndex = sub_index; - for (int i = 0; i < retval; i++) { - sensors_event_t *event = &data[i]; - // If it's a metadata event, rewrite the inner payload, not the sensor field. - if (event->type == SENSOR_TYPE_META_DATA) { - full_handle.localHandle = event->meta_data.sensor; - event->meta_data.sensor = full_to_global[full_handle]; - } else { - full_handle.localHandle = event->sensor; - event->sensor = full_to_global[full_handle]; + // If it's a metadata event, rewrite the inner payload, not the sensor field. + if (dest->type == SENSOR_TYPE_META_DATA) { + full_handle.localHandle = dest->meta_data.sensor; + dest->meta_data.sensor = full_to_global[full_handle]; + } else { + full_handle.localHandle = dest->sensor; + dest->sensor = full_to_global[full_handle]; + } +} + +int sensors_poll_context_t::poll(sensors_event_t *data, int maxReads) { + ALOGD("poll"); + int empties = 0; + int queueCount = (int)this->queues.size(); + int eventsRead = 0; + + pthread_mutex_lock(&queue_mutex); + while (eventsRead == 0) { + while (empties < queueCount && eventsRead < maxReads) { + SensorEventQueue* queue = this->queues.at(this->nextReadIndex); + ALOGD("queue size: %d", queue->getSize()); + sensors_event_t* event = queue->peek(); + if (event == NULL) { + empties++; + } else { + empties = 0; + this->copy_event_remap_handle(&data[eventsRead++], event, nextReadIndex); + queue->dequeue(); + } + this->nextReadIndex = (this->nextReadIndex + 1) % queueCount; + } + if (eventsRead == 0) { + // The queues have been scanned and none contain data. + // Wait for any of them to signal that there's data. + ALOGD("poll stopping to wait for data"); + waiting_for_data = true; + pthread_cond_wait(&data_available_cond, &queue_mutex); + waiting_for_data = false; + empties = 0; + ALOGD("poll done waiting for data"); } } - return retval; + pthread_mutex_unlock(&queue_mutex); + ALOGD("...poll's blocking read ends. Returning %d events.", eventsRead); + + return eventsRead; } int sensors_poll_context_t::batch(int handle, int flags, int64_t period_ns, int64_t timeout) { @@ -436,9 +526,13 @@ static void lazy_init_sensors_list() { static int module__get_sensors_list(struct sensors_module_t* module, struct sensor_t const** list) { - ALOGD("module__get_sensors_list"); + ALOGD("module__get_sensors_list start"); lazy_init_sensors_list(); *list = global_sensors_list; + ALOGD("global_sensors_count: %d", global_sensors_count); + for (int i = 0; i < global_sensors_count; i++) { + ALOGD("sensor type: %d", global_sensors_list[i].type); + } return global_sensors_count; } @@ -480,6 +574,8 @@ static int open_sensors(const struct hw_module_t* hw_module, const char* name, dev->proxy_device.batch = device__batch; dev->proxy_device.flush = device__flush; + dev->nextReadIndex = 0; + // Open() the subhal modules. Remember their devices in a vector parallel to sub_hw_modules. for (std::vector<hw_module_t*>::iterator it = sub_hw_modules->begin(); it != sub_hw_modules->end(); it++) { diff --git a/modules/sensors/tests/SensorEventQueue_test.cpp b/modules/sensors/tests/SensorEventQueue_test.cpp index 3b89964..cbe4377 100644 --- a/modules/sensors/tests/SensorEventQueue_test.cpp +++ b/modules/sensors/tests/SensorEventQueue_test.cpp @@ -2,6 +2,8 @@ #include <stdlib.h> #include <hardware/sensors.h> #include <pthread.h> +#include <cutils/atomic.h> + #include "SensorEventQueue.cpp" // Unit tests for the SensorEventQueue. @@ -78,93 +80,9 @@ bool testWrappingWriteSizeCounts() { return true; } -static const int TTOQ_EVENT_COUNT = 10000; - -struct TaskContext { - bool success; - SensorEventQueue* queue; -}; - -void* writerTask(void* ptr) { - printf("writerTask starts\n"); - TaskContext* ctx = (TaskContext*)ptr; - SensorEventQueue* queue = ctx->queue; - int totalWrites = 0; - sensors_event_t* buffer; - while (totalWrites < TTOQ_EVENT_COUNT) { - queue->waitForSpaceAndLock(); - int writableSize = queue->getWritableRegion(rand() % 10 + 1, &buffer); - queue->unlock(); - for (int i = 0; i < writableSize; i++) { - // serialize the events - buffer[i].timestamp = totalWrites++; - } - queue->lock(); - queue->markAsWritten(writableSize); - queue->unlock(); - } - printf("writerTask ends normally\n"); - return NULL; -} - -void* readerTask(void* ptr) { - printf("readerTask starts\n"); - TaskContext* ctx = (TaskContext*)ptr; - SensorEventQueue* queue = ctx->queue; - int totalReads = 0; - while (totalReads < TTOQ_EVENT_COUNT) { - queue->waitForDataAndLock(); - int maxReads = rand() % 20 + 1; - int reads = 0; - while (queue->getSize() && reads < maxReads) { - sensors_event_t* event = queue->peek(); - if (totalReads != event->timestamp) { - printf("FAILURE: readerTask expected timestamp %d; actual was %d\n", - totalReads, (int)(event->timestamp)); - ctx->success = false; - return NULL; - } - queue->dequeue(); - totalReads++; - reads++; - } - queue->unlock(); - } - printf("readerTask ends normally\n"); - return NULL; -} - - -// Create a short queue, and write and read a ton of data through it. -// Write serial timestamps into the events, and expect to read them in the right order. -bool testTwoThreadsOneQueue() { - printf("TEST testTwoThreadsOneQueue\n"); - SensorEventQueue* queue = new SensorEventQueue(100); - - TaskContext readerCtx; - readerCtx.success = true; - readerCtx.queue = queue; - - TaskContext writerCtx; - writerCtx.success = true; - writerCtx.queue = queue; - - pthread_t writer, reader; - pthread_create(&reader, NULL, readerTask, &readerCtx); - pthread_create(&writer, NULL, writerTask, &writerCtx); - - pthread_join(writer, NULL); - pthread_join(reader, NULL); - - printf("testTwoThreadsOneQueue done\n"); - return readerCtx.success && writerCtx.success; -} - - int main(int argc, char **argv) { if (testSimpleWriteSizeCounts() && - testWrappingWriteSizeCounts() && - testTwoThreadsOneQueue()) { + testWrappingWriteSizeCounts()) { printf("ALL PASSED\n"); } else { printf("SOMETHING FAILED\n"); |