1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
|
//
// Copyright 2010 The Android Open Source Project
//
// A looper implementation based on epoll().
//
#define LOG_TAG "Looper"
//#define LOG_NDEBUG 0
// Debugs poll and wake interactions.
#define DEBUG_POLL_AND_WAKE 0
// Debugs callback registration and invocation.
#define DEBUG_CALLBACKS 0
#include <cutils/log.h>
#include <utils/Looper.h>
#include <utils/Timers.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
namespace android {
// Hint for number of file descriptors to be associated with the epoll instance.
static const int EPOLL_SIZE_HINT = 8;
// Maximum number of file descriptors for which to retrieve poll events each iteration.
static const int EPOLL_MAX_EVENTS = 16;
static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
static pthread_key_t gTLSKey = 0;
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks),
mResponseIndex(0) {
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
int wakeFds[2];
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}
Looper::~Looper() {
close(mWakeReadPipeFd);
close(mWakeWritePipeFd);
close(mEpollFd);
}
void Looper::initTLSKey() {
int result = pthread_key_create(& gTLSKey, threadDestructor);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
}
void Looper::threadDestructor(void *st) {
Looper* const self = static_cast<Looper*>(st);
if (self != NULL) {
self->decStrong((void*)threadDestructor);
}
}
void Looper::setForThread(const sp<Looper>& looper) {
sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
if (looper != NULL) {
looper->incStrong((void*)threadDestructor);
}
pthread_setspecific(gTLSKey, looper.get());
if (old != NULL) {
old->decStrong((void*)threadDestructor);
}
}
sp<Looper> Looper::getForThread() {
int result = pthread_once(& gTLSOnce, initTLSKey);
LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
return (Looper*)pthread_getspecific(gTLSKey);
}
sp<Looper> Looper::prepare(int opts) {
bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
sp<Looper> looper = Looper::getForThread();
if (looper == NULL) {
looper = new Looper(allowNonCallbacks);
Looper::setForThread(looper);
}
if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
LOGW("Looper already prepared for this thread with a different value for the "
"ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
}
return looper;
}
bool Looper::getAllowNonCallbacks() const {
return mAllowNonCallbacks;
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
if (! response.request.callback) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p", this,
response.request.ident, response.request.fd,
response.events, response.request.data);
#endif
if (outFd != NULL) *outFd = response.request.fd;
if (outEvents != NULL) *outEvents = response.events;
if (outData != NULL) *outData = response.request.data;
return response.request.ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = NULL;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
if (eventCount < 0) {
if (errno == EINTR) {
return ALOOPER_POLL_WAKE;
}
LOGW("Poll failed with an unexpected error, errno=%d", errno);
return ALOOPER_POLL_ERROR;
}
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - timeout", this);
#endif
return ALOOPER_POLL_TIMEOUT;
}
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
bool acquiredLock = false;
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - awoken", this);
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
} else {
LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
if (! acquiredLock) {
mLock.lock();
acquiredLock = true;
}
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
Response response;
response.events = events;
response.request = mRequests.valueAt(requestIndex);
mResponses.push(response);
} else {
LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
if (acquiredLock) {
mLock.unlock();
}
for (size_t i = 0; i < mResponses.size(); i++) {
const Response& response = mResponses.itemAt(i);
if (response.request.callback) {
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
response.request.fd, response.events, response.request.data);
#endif
int callbackResult = response.request.callback(
response.request.fd, response.events, response.request.data);
if (callbackResult == 0) {
removeFd(response.request.fd);
}
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
if (timeoutMillis <= 0) {
int result;
do {
result = pollOnce(timeoutMillis, outFd, outEvents, outData);
} while (result == ALOOPER_POLL_CALLBACK);
return result;
} else {
nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
+ milliseconds_to_nanoseconds(timeoutMillis);
for (;;) {
int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
if (result != ALOOPER_POLL_CALLBACK) {
return result;
}
nsecs_t timeoutNanos = endTime - systemTime(SYSTEM_TIME_MONOTONIC);
if (timeoutNanos <= 0) {
return ALOOPER_POLL_TIMEOUT;
}
timeoutMillis = int(nanoseconds_to_milliseconds(timeoutNanos + 999999LL));
}
}
}
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
LOGW("Could not write wake signal, errno=%d", errno);
}
}
}
int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
#if DEBUG_CALLBACKS
LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback, data);
#endif
int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
if (events & ALOOPER_EVENT_ERROR) epollEvents |= EPOLLERR;
if (events & ALOOPER_EVENT_HANGUP) epollEvents |= EPOLLHUP;
if (epollEvents == 0) {
LOGE("Invalid attempt to set a callback with no selected poll events.");
return -1;
}
if (! callback) {
if (! mAllowNonCallbacks) {
LOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {
LOGE("Invalid attempt to set NULL callback with ident <= 0.");
return -1;
}
}
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data;
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = epollEvents;
eventItem.data.fd = fd;
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
LOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
LOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
int Looper::removeFd(int fd) {
#if DEBUG_CALLBACKS
LOGD("%p ~ removeFd - fd=%d", this, fd);
#endif
{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
return 0;
}
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
if (epollResult < 0) {
LOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.removeItemsAt(requestIndex);
} // request lock
return 1;
}
} // namespace android
|