1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.Executor;
29
30 import org.apache.mina.core.RuntimeIoException;
31 import org.apache.mina.core.buffer.IoBuffer;
32 import org.apache.mina.core.file.FileRegion;
33 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
34 import org.apache.mina.core.session.SessionState;
35 import org.apache.mina.util.CircularQueue;
36 import org.apache.tomcat.jni.Poll;
37 import org.apache.tomcat.jni.Pool;
38 import org.apache.tomcat.jni.Socket;
39 import org.apache.tomcat.jni.Status;
40
41
42
43
44
45
46 public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
47 private static final int POLLSET_SIZE = 1024;
48
49 private final Map<Long, AprSession> allSessions =
50 new HashMap<Long, AprSession>(POLLSET_SIZE);
51
52 private final Object wakeupLock = new Object();
53 private final long wakeupSocket;
54 private volatile boolean toBeWakenUp;
55
56 private final long pool;
57 private final long bufferPool;
58 private final long pollset;
59 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
60 private final List<AprSession> polledSessions =
61 new CircularQueue<AprSession>(POLLSET_SIZE);
62
63
64
65
66
67
68
69 public AprIoProcessor(Executor executor) {
70 super(executor);
71
72
73 pool = Pool.create(AprLibrary.getInstance().getRootPool());
74 bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
75
76 try {
77 wakeupSocket = Socket.create(
78 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
79 } catch (RuntimeException e) {
80 throw e;
81 } catch (Error e) {
82 throw e;
83 } catch (Exception e) {
84 throw new RuntimeIoException("Failed to create a wakeup socket.", e);
85 }
86
87 boolean success = false;
88 long newPollset;
89 try {
90 newPollset = Poll.create(
91 POLLSET_SIZE,
92 pool,
93 Poll.APR_POLLSET_THREADSAFE,
94 Long.MAX_VALUE);
95
96 if (newPollset == 0) {
97 newPollset = Poll.create(
98 62,
99 pool,
100 Poll.APR_POLLSET_THREADSAFE,
101 Long.MAX_VALUE);
102 }
103
104 pollset = newPollset;
105 if (pollset < 0) {
106 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
107 throw new RuntimeIoException(
108 "Thread-safe pollset is not supported in this platform.");
109 }
110 }
111 success = true;
112 } catch (RuntimeException e) {
113 throw e;
114 } catch (Error e) {
115 throw e;
116 } catch (Exception e) {
117 throw new RuntimeIoException("Failed to create a pollset.", e);
118 } finally {
119 if (!success) {
120 dispose();
121 }
122 }
123 }
124
125
126
127
128 @Override
129 protected void dispose0() {
130 Poll.destroy(pollset);
131 Socket.close(wakeupSocket);
132 Pool.destroy(bufferPool);
133 Pool.destroy(pool);
134 }
135
136
137
138
139 @Override
140 protected int select() throws Exception {
141 return select(Integer.MAX_VALUE);
142 }
143
144
145
146
147 @Override
148 protected int select(long timeout) throws Exception {
149 int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
150 if (rv <= 0) {
151 if (rv != -120001) {
152 throwException(rv);
153 }
154
155 rv = Poll.maintain(pollset, polledSockets, true);
156 if (rv > 0) {
157 for (int i = 0; i < rv; i ++) {
158 long socket = polledSockets[i];
159 AprSession session = allSessions.get(socket);
160 if (session == null) {
161 continue;
162 }
163
164 int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
165 (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
166
167 Poll.add(pollset, socket, flag);
168 }
169 } else if (rv < 0) {
170 throwException(rv);
171 }
172
173 return 0;
174 } else {
175 rv <<= 1;
176 if (!polledSessions.isEmpty()) {
177 polledSessions.clear();
178 }
179 for (int i = 0; i < rv; i ++) {
180 long flag = polledSockets[i];
181 long socket = polledSockets[++i];
182 if (socket == wakeupSocket) {
183 synchronized (wakeupLock) {
184 Poll.remove(pollset, wakeupSocket);
185 toBeWakenUp = false;
186 }
187 continue;
188 }
189 AprSession session = allSessions.get(socket);
190 if (session == null) {
191 continue;
192 }
193
194 session.setReadable((flag & Poll.APR_POLLIN) != 0);
195 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
196
197 polledSessions.add(session);
198 }
199
200 return polledSessions.size();
201 }
202 }
203
204
205
206
207 @Override
208 protected boolean isSelectorEmpty() {
209 return allSessions.isEmpty();
210 }
211
212
213
214
215 @Override
216 protected void wakeup() {
217 if (toBeWakenUp) {
218 return;
219 }
220
221
222 synchronized (wakeupLock) {
223 toBeWakenUp = true;
224 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
225 }
226 }
227
228
229
230
231 @Override
232 protected Iterator<AprSession> allSessions() {
233 return allSessions.values().iterator();
234 }
235
236
237
238
239 @Override
240 protected Iterator<AprSession> selectedSessions() {
241 return polledSessions.iterator();
242 }
243
244 @Override
245 protected void init(AprSession session) throws Exception {
246 long s = session.getDescriptor();
247 Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
248 Socket.timeoutSet(s, 0);
249
250 int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
251 if (rv != Status.APR_SUCCESS) {
252 throwException(rv);
253 }
254
255 session.setInterestedInRead(true);
256 allSessions.put(s, session);
257 }
258
259
260
261
262 @Override
263 protected void destroy(AprSession session) throws Exception {
264 if (allSessions.remove(session.getDescriptor()) == null) {
265
266 return;
267 }
268
269 int ret = Poll.remove(pollset, session.getDescriptor());
270 try {
271 if (ret != Status.APR_SUCCESS) {
272 throwException(ret);
273 }
274 } finally {
275 ret = Socket.close(session.getDescriptor());
276
277
278
279 Socket.destroy(session.getDescriptor());
280 session.setDescriptor(0);
281
282 if (ret != Status.APR_SUCCESS) {
283 throwException(ret);
284 }
285 }
286 }
287
288
289
290
291 @Override
292 protected SessionState getState(AprSession session) {
293 long socket = session.getDescriptor();
294
295 if (socket != 0) {
296 return SessionState.OPENED;
297 } else if (allSessions.get(socket) != null) {
298 return SessionState.OPENING;
299 } else {
300 return SessionState.CLOSING;
301 }
302 }
303
304
305
306
307 @Override
308 protected boolean isReadable(AprSession session) {
309 return session.isReadable();
310 }
311
312
313
314
315 @Override
316 protected boolean isWritable(AprSession session) {
317 return session.isWritable();
318 }
319
320
321
322
323 @Override
324 protected boolean isInterestedInRead(AprSession session) {
325 return session.isInterestedInRead();
326 }
327
328
329
330
331 @Override
332 protected boolean isInterestedInWrite(AprSession session) {
333 return session.isInterestedInWrite();
334 }
335
336
337
338
339 @Override
340 protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
341 if (session.isInterestedInRead() == isInterested) {
342 return;
343 }
344
345 int rv = Poll.remove(pollset, session.getDescriptor());
346 if (rv != Status.APR_SUCCESS) {
347 throwException(rv);
348 }
349
350 int flags = (isInterested ? Poll.APR_POLLIN : 0)
351 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
352
353 rv = Poll.add(pollset, session.getDescriptor(), flags);
354 if (rv == Status.APR_SUCCESS) {
355 session.setInterestedInRead(isInterested);
356 } else {
357 throwException(rv);
358 }
359 }
360
361
362
363
364 @Override
365 protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
366 if (session.isInterestedInWrite() == isInterested) {
367 return;
368 }
369
370 int rv = Poll.remove(pollset, session.getDescriptor());
371 if (rv != Status.APR_SUCCESS) {
372 throwException(rv);
373 }
374
375 int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
376 | (isInterested ? Poll.APR_POLLOUT : 0);
377
378 rv = Poll.add(pollset, session.getDescriptor(), flags);
379 if (rv == Status.APR_SUCCESS) {
380 session.setInterestedInWrite(isInterested);
381 } else {
382 throwException(rv);
383 }
384 }
385
386
387
388
389 @Override
390 protected int read(AprSession session, IoBuffer buffer) throws Exception {
391 int bytes;
392 int capacity = buffer.remaining();
393
394 ByteBuffer b = Pool.alloc(bufferPool, capacity);
395
396 try {
397 bytes = Socket.recvb(
398 session.getDescriptor(), b, 0, capacity);
399
400 if (bytes > 0) {
401 b.position(0);
402 b.limit(bytes);
403 buffer.put(b);
404 } else if (bytes < 0) {
405 if (Status.APR_STATUS_IS_EOF(-bytes)) {
406 bytes = -1;
407 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
408 bytes = 0;
409 } else {
410 throwException(bytes);
411 }
412 }
413 } finally {
414 Pool.clear(bufferPool);
415 }
416
417 return bytes;
418 }
419
420
421
422
423 @Override
424 protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
425 int writtenBytes;
426 if (buf.isDirect()) {
427 writtenBytes = Socket.sendb(
428 session.getDescriptor(), buf.buf(), buf.position(), length);
429 } else {
430 writtenBytes = Socket.send(
431 session.getDescriptor(), buf.array(), buf.position(), length);
432 if (writtenBytes > 0) {
433 buf.skip(writtenBytes);
434 }
435 }
436
437 if (writtenBytes < 0) {
438 if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
439 writtenBytes = 0;
440 } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
441 writtenBytes = 0;
442 } else {
443 throwException(writtenBytes);
444 }
445 }
446 return writtenBytes;
447 }
448
449
450
451
452 @Override
453 protected int transferFile(AprSession session, FileRegion region, int length)
454 throws Exception {
455 throw new UnsupportedOperationException();
456 }
457
458 private void throwException(int code) throws IOException {
459 throw new IOException(
460 org.apache.tomcat.jni.Error.strerror(-code) +
461 " (code: " + code + ")");
462 }
463 }