MultiHal multithreaded polling
Change-Id: I3ebe380169eed1c8deeca2860d1788be6c14837e
This commit is contained in:
committed by
Mike Lockwood
parent
ab6ec384c4
commit
92863c14b7
@@ -17,55 +17,28 @@
|
||||
#include <hardware/sensors.h>
|
||||
#include <algorithm>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <linux/input.h>
|
||||
#include <cutils/atomic.h>
|
||||
#include <cutils/log.h>
|
||||
|
||||
#include "SensorEventQueue.h"
|
||||
|
||||
SensorEventQueue::SensorEventQueue(int capacity) {
|
||||
mCapacity = capacity;
|
||||
|
||||
mStart = 0;
|
||||
mSize = 0;
|
||||
mData = new sensors_event_t[mCapacity];
|
||||
pthread_cond_init(&mDataAvailableCondition, NULL);
|
||||
pthread_cond_init(&mSpaceAvailableCondition, NULL);
|
||||
pthread_mutex_init(&mMutex, NULL);
|
||||
}
|
||||
|
||||
SensorEventQueue::~SensorEventQueue() {
|
||||
delete[] mData;
|
||||
mData = NULL;
|
||||
pthread_cond_destroy(&mDataAvailableCondition);
|
||||
pthread_cond_destroy(&mSpaceAvailableCondition);
|
||||
pthread_mutex_destroy(&mMutex);
|
||||
}
|
||||
|
||||
void SensorEventQueue::lock() {
|
||||
pthread_mutex_lock(&mMutex);
|
||||
}
|
||||
|
||||
void SensorEventQueue::unlock() {
|
||||
pthread_mutex_unlock(&mMutex);
|
||||
}
|
||||
|
||||
void SensorEventQueue::waitForSpaceAndLock() {
|
||||
lock();
|
||||
while (mSize >= mCapacity) {
|
||||
pthread_cond_wait(&mSpaceAvailableCondition, &mMutex);
|
||||
}
|
||||
}
|
||||
|
||||
void SensorEventQueue::waitForDataAndLock() {
|
||||
lock();
|
||||
while (mSize <= 0) {
|
||||
pthread_cond_wait(&mDataAvailableCondition, &mMutex);
|
||||
}
|
||||
}
|
||||
|
||||
int SensorEventQueue::getWritableRegion(int requestedLength, sensors_event_t** out) {
|
||||
if (mSize >= mCapacity || requestedLength <= 0) {
|
||||
if (mSize == mCapacity || requestedLength <= 0) {
|
||||
*out = NULL;
|
||||
return 0;
|
||||
}
|
||||
@@ -88,9 +61,6 @@ int SensorEventQueue::getWritableRegion(int requestedLength, sensors_event_t** o
|
||||
|
||||
void SensorEventQueue::markAsWritten(int count) {
|
||||
mSize += count;
|
||||
if (mSize) {
|
||||
pthread_cond_broadcast(&mDataAvailableCondition);
|
||||
}
|
||||
}
|
||||
|
||||
int SensorEventQueue::getSize() {
|
||||
@@ -98,13 +68,21 @@ int SensorEventQueue::getSize() {
|
||||
}
|
||||
|
||||
sensors_event_t* SensorEventQueue::peek() {
|
||||
if (mSize <= 0) return NULL;
|
||||
if (mSize == 0) return NULL;
|
||||
return &mData[mStart];
|
||||
}
|
||||
|
||||
void SensorEventQueue::dequeue() {
|
||||
if (mSize <= 0) return;
|
||||
if (mSize == 0) return;
|
||||
if (mSize == mCapacity) {
|
||||
pthread_cond_broadcast(&mSpaceAvailableCondition);
|
||||
}
|
||||
mSize--;
|
||||
mStart = (mStart + 1) % mCapacity;
|
||||
pthread_cond_broadcast(&mSpaceAvailableCondition);
|
||||
}
|
||||
|
||||
void SensorEventQueue::waitForSpace(pthread_mutex_t* mutex) {
|
||||
while (mSize == mCapacity) {
|
||||
pthread_cond_wait(&mSpaceAvailableCondition, mutex);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,17 +35,11 @@ class SensorEventQueue {
|
||||
int mStart; // start of readable region
|
||||
int mSize; // number of readable items
|
||||
sensors_event_t* mData;
|
||||
pthread_cond_t mDataAvailableCondition;
|
||||
pthread_cond_t mSpaceAvailableCondition;
|
||||
pthread_mutex_t mMutex;
|
||||
|
||||
public:
|
||||
SensorEventQueue(int capacity);
|
||||
~SensorEventQueue();
|
||||
void lock();
|
||||
void unlock();
|
||||
void waitForSpaceAndLock();
|
||||
void waitForDataAndLock();
|
||||
|
||||
// Returns length of region, between zero and min(capacity, requestedLength). If there is any
|
||||
// writable space, it will return a region of at least one. Because it must return
|
||||
@@ -73,6 +67,9 @@ public:
|
||||
// This will decrease the size by one, freeing up the oldest readable event's slot for writing.
|
||||
// Only call while holding the lock.
|
||||
void dequeue();
|
||||
|
||||
// Blocks until space is available. No-op if there is already space.
|
||||
void waitForSpace(pthread_mutex_t* mutex);
|
||||
};
|
||||
|
||||
#endif // SENSOREVENTQUEUE_H_
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
|
||||
#include <stdio.h>
|
||||
#include <dlfcn.h>
|
||||
#include <SensorEventQueue.h>
|
||||
|
||||
// comment out to disable debug-level logging
|
||||
#define LOG_NDEBUG 0
|
||||
@@ -41,6 +42,13 @@ static const int MAX_CONF_LINE_LENGTH = 1024;
|
||||
static pthread_mutex_t init_modules_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
static pthread_mutex_t init_sensors_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
// This mutex is shared by all queues
|
||||
static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
// Used to pause the multihal poll(). Broadcasted by sub-polling tasks if waiting_for_data.
|
||||
static pthread_cond_t data_available_cond = PTHREAD_COND_INITIALIZER;
|
||||
bool waiting_for_data = false;
|
||||
|
||||
/*
|
||||
* Vector of sub modules, whose indexes are referred to ni this file as module_index.
|
||||
*/
|
||||
@@ -65,7 +73,7 @@ struct FullHandle {
|
||||
return localHandle < that.localHandle;
|
||||
}
|
||||
|
||||
bool operator=(const FullHandle &that) const {
|
||||
bool operator==(const FullHandle &that) const {
|
||||
return moduleIndex == that.moduleIndex && localHandle == that.localHandle;
|
||||
}
|
||||
};
|
||||
@@ -75,13 +83,12 @@ std::map<FullHandle, int> full_to_global;
|
||||
int next_global_handle = 1;
|
||||
|
||||
static int assign_global_handle(int module_index, int local_handle) {
|
||||
ALOGD("assign_global_handle %d %d", module_index, local_handle);
|
||||
int global_handle = next_global_handle++;
|
||||
FullHandle *full_handle = new FullHandle();
|
||||
full_handle->moduleIndex = module_index;
|
||||
full_handle->localHandle = local_handle;
|
||||
full_to_global[*full_handle] = global_handle;
|
||||
global_to_full[global_handle] = *full_handle;
|
||||
FullHandle full_handle;
|
||||
full_handle.moduleIndex = module_index;
|
||||
full_handle.localHandle = local_handle;
|
||||
full_to_global[full_handle] = global_handle;
|
||||
global_to_full[global_handle] = full_handle;
|
||||
return global_handle;
|
||||
}
|
||||
|
||||
@@ -90,12 +97,53 @@ static int get_local_handle(int global_handle) {
|
||||
}
|
||||
|
||||
static int get_module_index(int global_handle) {
|
||||
ALOGD("get_module_index %d", global_handle);
|
||||
ALOGD("get_module_index for global_handle %d", global_handle);
|
||||
FullHandle f = global_to_full[global_handle];
|
||||
ALOGD("FullHandle moduleIndex %d, localHandle %d", f.moduleIndex, f.localHandle);
|
||||
return f.moduleIndex;
|
||||
}
|
||||
|
||||
static const int SENSOR_EVENT_QUEUE_CAPACITY = 20;
|
||||
|
||||
struct TaskContext {
|
||||
sensors_poll_device_t* device;
|
||||
SensorEventQueue* queue;
|
||||
};
|
||||
|
||||
void *writerTask(void* ptr) {
|
||||
ALOGD("writerTask STARTS");
|
||||
TaskContext* ctx = (TaskContext*)ptr;
|
||||
sensors_poll_device_t* device = ctx->device;
|
||||
SensorEventQueue* queue = ctx->queue;
|
||||
sensors_event_t* buffer;
|
||||
int eventsPolled;
|
||||
while (1) {
|
||||
ALOGD("writerTask before lock 1");
|
||||
pthread_mutex_lock(&queue_mutex);
|
||||
ALOGD("writerTask before waitForSpace");
|
||||
queue->waitForSpace(&queue_mutex);
|
||||
ALOGD("writerTask after waitForSpace");
|
||||
int bufferSize = queue->getWritableRegion(SENSOR_EVENT_QUEUE_CAPACITY, &buffer);
|
||||
// Do blocking poll outside of lock
|
||||
pthread_mutex_unlock(&queue_mutex);
|
||||
|
||||
ALOGD("writerTask before poll() - bufferSize = %d", bufferSize);
|
||||
eventsPolled = device->poll(device, buffer, bufferSize);
|
||||
ALOGD("writerTask poll() got %d events.", eventsPolled);
|
||||
|
||||
ALOGD("writerTask before lock 2");
|
||||
pthread_mutex_lock(&queue_mutex);
|
||||
queue->markAsWritten(eventsPolled);
|
||||
ALOGD("writerTask wrote %d events", eventsPolled);
|
||||
if (waiting_for_data) {
|
||||
ALOGD("writerTask - broadcast data_available_cond");
|
||||
pthread_cond_broadcast(&data_available_cond);
|
||||
}
|
||||
pthread_mutex_unlock(&queue_mutex);
|
||||
}
|
||||
// never actually returns
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Cache of all sensors, with original handles replaced by global handles.
|
||||
@@ -124,15 +172,31 @@ struct sensors_poll_context_t {
|
||||
int close();
|
||||
|
||||
std::vector<hw_device_t*> sub_hw_devices;
|
||||
std::vector<SensorEventQueue*> queues;
|
||||
std::vector<pthread_t> threads;
|
||||
int nextReadIndex;
|
||||
|
||||
sensors_poll_device_t* get_v0_device_by_handle(int global_handle);
|
||||
sensors_poll_device_1_t* get_v1_device_by_handle(int global_handle);
|
||||
int get_device_version_by_handle(int global_handle);
|
||||
|
||||
void copy_event_remap_handle(sensors_event_t* src, sensors_event_t* dest, int sub_index);
|
||||
};
|
||||
|
||||
void sensors_poll_context_t::addSubHwDevice(struct hw_device_t* sub_hw_device) {
|
||||
ALOGD("addSubHwDevice");
|
||||
this->sub_hw_devices.push_back(sub_hw_device);
|
||||
|
||||
SensorEventQueue *queue = new SensorEventQueue(SENSOR_EVENT_QUEUE_CAPACITY);
|
||||
this->queues.push_back(queue);
|
||||
|
||||
TaskContext* taskContext = new TaskContext();
|
||||
taskContext->device = (sensors_poll_device_t*) sub_hw_device;
|
||||
taskContext->queue = queue;
|
||||
|
||||
pthread_t writerThread;
|
||||
pthread_create(&writerThread, NULL, writerTask, taskContext);
|
||||
this->threads.push_back(writerThread);
|
||||
}
|
||||
|
||||
sensors_poll_device_t* sensors_poll_context_t::get_v0_device_by_handle(int handle) {
|
||||
@@ -168,34 +232,60 @@ int sensors_poll_context_t::setDelay(int handle, int64_t ns) {
|
||||
return retval;
|
||||
}
|
||||
|
||||
int sensors_poll_context_t::poll(sensors_event_t *data, int count) {
|
||||
ALOGD("poll");
|
||||
|
||||
// This only gets the first device. Parallel polling of multiple devices is coming soon.
|
||||
int sub_index = 0;
|
||||
sensors_poll_device_t* v0 = (sensors_poll_device_t*) this->sub_hw_devices[sub_index];
|
||||
|
||||
ALOGD("poll's blocking read begins...");
|
||||
int retval = v0->poll(v0, data, count);
|
||||
ALOGD("...poll's blocking read ends");
|
||||
ALOGD("rewriting %d sensor handles...", retval);
|
||||
// A normal event's "sensor" field is a local handles. Convert it to a global handle.
|
||||
void sensors_poll_context_t::copy_event_remap_handle(sensors_event_t* dest, sensors_event_t* src,
|
||||
int sub_index) {
|
||||
memcpy(dest, src, sizeof(struct sensors_event_t));
|
||||
// A normal event's "sensor" field is a local handle. Convert it to a global handle.
|
||||
// A meta-data event must have its sensor set to 0, but it has a nested event
|
||||
// with a local handle that needs to be converted to a global handle.
|
||||
FullHandle full_handle;
|
||||
full_handle.moduleIndex = sub_index;
|
||||
for (int i = 0; i < retval; i++) {
|
||||
sensors_event_t *event = &data[i];
|
||||
// If it's a metadata event, rewrite the inner payload, not the sensor field.
|
||||
if (event->type == SENSOR_TYPE_META_DATA) {
|
||||
full_handle.localHandle = event->meta_data.sensor;
|
||||
event->meta_data.sensor = full_to_global[full_handle];
|
||||
} else {
|
||||
full_handle.localHandle = event->sensor;
|
||||
event->sensor = full_to_global[full_handle];
|
||||
// If it's a metadata event, rewrite the inner payload, not the sensor field.
|
||||
if (dest->type == SENSOR_TYPE_META_DATA) {
|
||||
full_handle.localHandle = dest->meta_data.sensor;
|
||||
dest->meta_data.sensor = full_to_global[full_handle];
|
||||
} else {
|
||||
full_handle.localHandle = dest->sensor;
|
||||
dest->sensor = full_to_global[full_handle];
|
||||
}
|
||||
}
|
||||
|
||||
int sensors_poll_context_t::poll(sensors_event_t *data, int maxReads) {
|
||||
ALOGD("poll");
|
||||
int empties = 0;
|
||||
int queueCount = (int)this->queues.size();
|
||||
int eventsRead = 0;
|
||||
|
||||
pthread_mutex_lock(&queue_mutex);
|
||||
while (eventsRead == 0) {
|
||||
while (empties < queueCount && eventsRead < maxReads) {
|
||||
SensorEventQueue* queue = this->queues.at(this->nextReadIndex);
|
||||
ALOGD("queue size: %d", queue->getSize());
|
||||
sensors_event_t* event = queue->peek();
|
||||
if (event == NULL) {
|
||||
empties++;
|
||||
} else {
|
||||
empties = 0;
|
||||
this->copy_event_remap_handle(&data[eventsRead++], event, nextReadIndex);
|
||||
queue->dequeue();
|
||||
}
|
||||
this->nextReadIndex = (this->nextReadIndex + 1) % queueCount;
|
||||
}
|
||||
if (eventsRead == 0) {
|
||||
// The queues have been scanned and none contain data.
|
||||
// Wait for any of them to signal that there's data.
|
||||
ALOGD("poll stopping to wait for data");
|
||||
waiting_for_data = true;
|
||||
pthread_cond_wait(&data_available_cond, &queue_mutex);
|
||||
waiting_for_data = false;
|
||||
empties = 0;
|
||||
ALOGD("poll done waiting for data");
|
||||
}
|
||||
}
|
||||
return retval;
|
||||
pthread_mutex_unlock(&queue_mutex);
|
||||
ALOGD("...poll's blocking read ends. Returning %d events.", eventsRead);
|
||||
|
||||
return eventsRead;
|
||||
}
|
||||
|
||||
int sensors_poll_context_t::batch(int handle, int flags, int64_t period_ns, int64_t timeout) {
|
||||
@@ -436,9 +526,13 @@ static void lazy_init_sensors_list() {
|
||||
|
||||
static int module__get_sensors_list(struct sensors_module_t* module,
|
||||
struct sensor_t const** list) {
|
||||
ALOGD("module__get_sensors_list");
|
||||
ALOGD("module__get_sensors_list start");
|
||||
lazy_init_sensors_list();
|
||||
*list = global_sensors_list;
|
||||
ALOGD("global_sensors_count: %d", global_sensors_count);
|
||||
for (int i = 0; i < global_sensors_count; i++) {
|
||||
ALOGD("sensor type: %d", global_sensors_list[i].type);
|
||||
}
|
||||
return global_sensors_count;
|
||||
}
|
||||
|
||||
@@ -480,6 +574,8 @@ static int open_sensors(const struct hw_module_t* hw_module, const char* name,
|
||||
dev->proxy_device.batch = device__batch;
|
||||
dev->proxy_device.flush = device__flush;
|
||||
|
||||
dev->nextReadIndex = 0;
|
||||
|
||||
// Open() the subhal modules. Remember their devices in a vector parallel to sub_hw_modules.
|
||||
for (std::vector<hw_module_t*>::iterator it = sub_hw_modules->begin();
|
||||
it != sub_hw_modules->end(); it++) {
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
#include <stdlib.h>
|
||||
#include <hardware/sensors.h>
|
||||
#include <pthread.h>
|
||||
#include <cutils/atomic.h>
|
||||
|
||||
#include "SensorEventQueue.cpp"
|
||||
|
||||
// Unit tests for the SensorEventQueue.
|
||||
@@ -78,93 +80,9 @@ bool testWrappingWriteSizeCounts() {
|
||||
return true;
|
||||
}
|
||||
|
||||
static const int TTOQ_EVENT_COUNT = 10000;
|
||||
|
||||
struct TaskContext {
|
||||
bool success;
|
||||
SensorEventQueue* queue;
|
||||
};
|
||||
|
||||
void* writerTask(void* ptr) {
|
||||
printf("writerTask starts\n");
|
||||
TaskContext* ctx = (TaskContext*)ptr;
|
||||
SensorEventQueue* queue = ctx->queue;
|
||||
int totalWrites = 0;
|
||||
sensors_event_t* buffer;
|
||||
while (totalWrites < TTOQ_EVENT_COUNT) {
|
||||
queue->waitForSpaceAndLock();
|
||||
int writableSize = queue->getWritableRegion(rand() % 10 + 1, &buffer);
|
||||
queue->unlock();
|
||||
for (int i = 0; i < writableSize; i++) {
|
||||
// serialize the events
|
||||
buffer[i].timestamp = totalWrites++;
|
||||
}
|
||||
queue->lock();
|
||||
queue->markAsWritten(writableSize);
|
||||
queue->unlock();
|
||||
}
|
||||
printf("writerTask ends normally\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void* readerTask(void* ptr) {
|
||||
printf("readerTask starts\n");
|
||||
TaskContext* ctx = (TaskContext*)ptr;
|
||||
SensorEventQueue* queue = ctx->queue;
|
||||
int totalReads = 0;
|
||||
while (totalReads < TTOQ_EVENT_COUNT) {
|
||||
queue->waitForDataAndLock();
|
||||
int maxReads = rand() % 20 + 1;
|
||||
int reads = 0;
|
||||
while (queue->getSize() && reads < maxReads) {
|
||||
sensors_event_t* event = queue->peek();
|
||||
if (totalReads != event->timestamp) {
|
||||
printf("FAILURE: readerTask expected timestamp %d; actual was %d\n",
|
||||
totalReads, (int)(event->timestamp));
|
||||
ctx->success = false;
|
||||
return NULL;
|
||||
}
|
||||
queue->dequeue();
|
||||
totalReads++;
|
||||
reads++;
|
||||
}
|
||||
queue->unlock();
|
||||
}
|
||||
printf("readerTask ends normally\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
// Create a short queue, and write and read a ton of data through it.
|
||||
// Write serial timestamps into the events, and expect to read them in the right order.
|
||||
bool testTwoThreadsOneQueue() {
|
||||
printf("TEST testTwoThreadsOneQueue\n");
|
||||
SensorEventQueue* queue = new SensorEventQueue(100);
|
||||
|
||||
TaskContext readerCtx;
|
||||
readerCtx.success = true;
|
||||
readerCtx.queue = queue;
|
||||
|
||||
TaskContext writerCtx;
|
||||
writerCtx.success = true;
|
||||
writerCtx.queue = queue;
|
||||
|
||||
pthread_t writer, reader;
|
||||
pthread_create(&reader, NULL, readerTask, &readerCtx);
|
||||
pthread_create(&writer, NULL, writerTask, &writerCtx);
|
||||
|
||||
pthread_join(writer, NULL);
|
||||
pthread_join(reader, NULL);
|
||||
|
||||
printf("testTwoThreadsOneQueue done\n");
|
||||
return readerCtx.success && writerCtx.success;
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
if (testSimpleWriteSizeCounts() &&
|
||||
testWrappingWriteSizeCounts() &&
|
||||
testTwoThreadsOneQueue()) {
|
||||
testWrappingWriteSizeCounts()) {
|
||||
printf("ALL PASSED\n");
|
||||
} else {
|
||||
printf("SOMETHING FAILED\n");
|
||||
|
||||
Reference in New Issue
Block a user