KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > core > TCCommJDK14


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.net.core;
6
7 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
8
9 import com.tc.exception.TCInternalError;
10 import com.tc.net.NIOWorkarounds;
11 import com.tc.net.core.event.TCListenerEvent;
12 import com.tc.util.Assert;
13 import com.tc.util.Util;
14 import com.tc.util.runtime.Os;
15
16 import java.io.IOException JavaDoc;
17 import java.net.Socket JavaDoc;
18 import java.nio.channels.CancelledKeyException JavaDoc;
19 import java.nio.channels.Channel JavaDoc;
20 import java.nio.channels.ClosedChannelException JavaDoc;
21 import java.nio.channels.GatheringByteChannel JavaDoc;
22 import java.nio.channels.ScatteringByteChannel JavaDoc;
23 import java.nio.channels.SelectableChannel JavaDoc;
24 import java.nio.channels.SelectionKey JavaDoc;
25 import java.nio.channels.Selector JavaDoc;
26 import java.nio.channels.ServerSocketChannel JavaDoc;
27 import java.nio.channels.SocketChannel JavaDoc;
28 import java.util.HashSet JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import java.util.Random JavaDoc;
31 import java.util.Set JavaDoc;
32
33 /**
34  * JDK 1.4 (NIO) version of TCComm. Uses a single internal thread and a selector to manage channels associated with
35  * <code>TCConnection</code>'s
36  *
37  * @author teck
38  */

39 class TCCommJDK14 extends AbstractTCComm {
40
41   TCCommJDK14() {
42     // nada
43
}
44
45   protected void startImpl() {
46     this.selector = null;
47
48     final int tries = 3;
49
50     for (int i = 0; i < tries; i++) {
51       try {
52         this.selector = Selector.open();
53         break;
54       } catch (IOException JavaDoc ioe) {
55         throw new RuntimeException JavaDoc(ioe);
56       } catch (NullPointerException JavaDoc npe) {
57         if (i < tries && NIOWorkarounds.selectorOpenRace(npe)) {
58           System.err.println("Attempting to work around sun bug 6427854 (attempt " + (i + 1) + " of " + tries + ")");
59           try {
60             Thread.sleep(new Random JavaDoc().nextInt(20) + 5);
61           } catch (InterruptedException JavaDoc ie) {
62             //
63
}
64           continue;
65         }
66         throw npe;
67       }
68     }
69
70     if (this.selector == null) { throw new RuntimeException JavaDoc("Could not start selector"); }
71
72     commThread = new TCCommThread(this);
73     commThread.start();
74   }
75
76   protected void stopImpl() {
77     try {
78       if (selector != null) {
79         selector.wakeup();
80       }
81     } catch (Exception JavaDoc e) {
82       logger.error("Exception trying to stop TCComm", e);
83     }
84   }
85
86   void addSelectorTask(final Runnable JavaDoc task) {
87     Assert.eval(!isCommThread());
88     boolean isInterrupted = false;
89
90     try {
91       while (true) {
92         try {
93           selectorTasks.put(task);
94           break;
95         } catch (InterruptedException JavaDoc e) {
96           logger.warn(e);
97           isInterrupted = true;
98         }
99       }
100     } finally {
101       selector.wakeup();
102     }
103     Util.selfInterruptIfNeeded(isInterrupted);
104   }
105
106   void stopListener(final ServerSocketChannel JavaDoc ssc, final Runnable JavaDoc callback) {
107     if (!isCommThread()) {
108       Runnable JavaDoc task = new Runnable JavaDoc() {
109         public void run() {
110           TCCommJDK14.this.stopListener(ssc, callback);
111         }
112       };
113       addSelectorTask(task);
114       return;
115     }
116
117     try {
118       cleanupChannel(ssc, null);
119     } catch (Exception JavaDoc e) {
120       logger.error(e);
121     } finally {
122       try {
123         callback.run();
124       } catch (Exception JavaDoc e) {
125         logger.error(e);
126       }
127     }
128   }
129
130   void unregister(SelectableChannel JavaDoc channel) {
131     Assert.assertTrue(isCommThread());
132     SelectionKey JavaDoc key = channel.keyFor(selector);
133     if (key != null) {
134       key.cancel();
135       key.attach(null);
136     }
137   }
138
139   void cleanupChannel(final Channel JavaDoc ch, final Runnable JavaDoc callback) {
140     if (null == ch) {
141       // not expected
142
logger.warn("null channel passed to cleanupChannel()", new Throwable JavaDoc());
143       return;
144     }
145
146     if (!isCommThread()) {
147       if (logger.isDebugEnabled()) {
148         logger.debug("queue'ing channel close operation");
149       }
150
151       addSelectorTask(new Runnable JavaDoc() {
152         public void run() {
153           TCCommJDK14.this.cleanupChannel(ch, callback);
154         }
155       });
156       return;
157     }
158
159     try {
160       if (ch instanceof SelectableChannel JavaDoc) {
161         SelectableChannel JavaDoc sc = (SelectableChannel JavaDoc) ch;
162
163         try {
164           SelectionKey JavaDoc sk = sc.keyFor(selector);
165           if (sk != null) {
166             sk.attach(null);
167             sk.cancel();
168           }
169         } catch (Exception JavaDoc e) {
170           logger.error("Exception trying to clear selection key", e);
171         }
172       }
173
174       if (ch instanceof SocketChannel JavaDoc) {
175         SocketChannel JavaDoc sc = (SocketChannel JavaDoc) ch;
176
177         Socket JavaDoc s = sc.socket();
178
179         if (null != s) {
180           synchronized (s) {
181
182             if (s.isConnected()) {
183               try {
184                 if (!s.isOutputShutdown()) {
185                   s.shutdownOutput();
186                 }
187               } catch (Exception JavaDoc e) {
188                 logger.error("Exception trying to shutdown socket output: " + e.getMessage());
189               }
190
191               try {
192                 if (!s.isClosed()) {
193                   s.close();
194                 }
195               } catch (Exception JavaDoc e) {
196                 logger.error("Exception trying to close() socket: " + e.getMessage());
197               }
198             }
199           }
200         }
201       } else if (ch instanceof ServerSocketChannel JavaDoc) {
202         ServerSocketChannel JavaDoc ssc = (ServerSocketChannel JavaDoc) ch;
203
204         try {
205           ssc.close();
206         } catch (Exception JavaDoc e) {
207           logger.error("Exception trying to close() server socket" + e.getMessage());
208         }
209       }
210
211       try {
212         ch.close();
213       } catch (Exception JavaDoc e) {
214         logger.error("Exception trying to close channel", e);
215       }
216     } catch (Exception JavaDoc e) {
217       // this is just a catch all to make sure that no exceptions will be thrown by this method, please do not remove
218
logger.error("Unhandled exception in cleanupChannel()", e);
219     } finally {
220       try {
221         if (callback != null) {
222           callback.run();
223         }
224       } catch (Throwable JavaDoc t) {
225         logger.error("Unhandled exception in cleanupChannel callback.", t);
226       }
227     }
228
229   }
230
231   void requestConnectInterest(TCConnectionJDK14 conn, SocketChannel JavaDoc sc) {
232     handleRequest(InterestRequest.createSetInterestRequest(sc, conn, SelectionKey.OP_CONNECT));
233   }
234
235   void requestReadInterest(TCJDK14ChannelReader reader, ScatteringByteChannel JavaDoc channel) {
236     handleRequest(InterestRequest.createAddInterestRequest((SelectableChannel JavaDoc) channel, reader, SelectionKey.OP_READ));
237   }
238
239   void requestWriteInterest(TCJDK14ChannelWriter writer, GatheringByteChannel JavaDoc channel) {
240     handleRequest(InterestRequest.createAddInterestRequest((SelectableChannel JavaDoc) channel, writer, SelectionKey.OP_WRITE));
241   }
242
243   void requestAcceptInterest(TCListenerJDK14 lsnr, ServerSocketChannel JavaDoc ssc) {
244     handleRequest(InterestRequest.createSetInterestRequest(ssc, lsnr, SelectionKey.OP_ACCEPT));
245   }
246
247   void removeWriteInterest(TCConnectionJDK14 conn, SelectableChannel JavaDoc channel) {
248     handleRequest(InterestRequest.createRemoveInterestRequest(channel, conn, SelectionKey.OP_WRITE));
249   }
250
251   void removeReadInterest(TCConnectionJDK14 conn, SelectableChannel JavaDoc channel) {
252     handleRequest(InterestRequest.createRemoveInterestRequest(channel, conn, SelectionKey.OP_READ));
253   }
254
255   public void closeEvent(TCListenerEvent event) {
256     commThread.listenerAdded(event.getSource());
257   }
258
259   void listenerAdded(TCListener listener) {
260     commThread.listenerAdded(listener);
261   }
262
263   private void handleRequest(final InterestRequest req) {
264     // ignore the request if we are stopped/stopping
265
if (isStopped()) { return; }
266
267     if (isCommThread()) {
268       modifyInterest(req);
269     } else {
270       addSelectorTask(new Runnable JavaDoc() {
271         public void run() {
272           TCCommJDK14.this.handleRequest(req);
273         }
274       });
275       return;
276     }
277   }
278
279   void selectLoop() throws IOException JavaDoc {
280     Assert.assertNotNull("selector", selector);
281     Assert.eval("Not started", isStarted());
282
283     while (true) {
284       final int numKeys;
285       try {
286         numKeys = selector.select();
287       } catch (IOException JavaDoc ioe) {
288         if (NIOWorkarounds.linuxSelectWorkaround(ioe)) {
289           logger.warn("working around Sun bug 4504001");
290           continue;
291         }
292         throw ioe;
293       }
294
295       if (isStopped()) {
296         if (logger.isDebugEnabled()) {
297           logger.debug("Select loop terminating");
298         }
299         return;
300       }
301
302       boolean isInterrupted = false;
303       // run any pending selector tasks
304
while (true) {
305         Runnable JavaDoc task = null;
306         while (true) {
307           try {
308             task = (Runnable JavaDoc) selectorTasks.poll(0);
309             break;
310           } catch (InterruptedException JavaDoc ie) {
311             logger.error("Error getting task from task queue", ie);
312             isInterrupted = true;
313           }
314         }
315
316         if (null == task) {
317           break;
318         }
319
320         try {
321           task.run();
322         } catch (Exception JavaDoc e) {
323           logger.error("error running selector task", e);
324         }
325       }
326       Util.selfInterruptIfNeeded(isInterrupted);
327
328       final Set JavaDoc selectedKeys = selector.selectedKeys();
329       if ((0 == numKeys) && (0 == selectedKeys.size())) {
330         continue;
331       }
332
333       for (Iterator JavaDoc iter = selectedKeys.iterator(); iter.hasNext();) {
334         SelectionKey JavaDoc key = (SelectionKey JavaDoc) iter.next();
335         iter.remove();
336
337         if (null == key) {
338           logger.error("Selection key is null");
339           continue;
340         }
341
342         try {
343           if (key.isAcceptable()) {
344             doAccept(key);
345             continue;
346           }
347
348           if (key.isConnectable()) {
349             doConnect(key);
350             continue;
351           }
352
353           if (key.isReadable()) {
354             ((TCJDK14ChannelReader) key.attachment()).doRead((ScatteringByteChannel JavaDoc) key.channel());
355           }
356
357           if (key.isValid() && key.isWritable()) {
358             ((TCJDK14ChannelWriter) key.attachment()).doWrite((GatheringByteChannel JavaDoc) key.channel());
359           }
360         } catch (CancelledKeyException JavaDoc cke) {
361           logger.warn(cke.getClass().getName() + " occured");
362         }
363       } // for
364
} // while (true)
365
}
366
367   private void dispose() {
368     if (selector != null) {
369
370       for (Iterator JavaDoc keys = selector.keys().iterator(); keys.hasNext();) {
371         try {
372           SelectionKey JavaDoc key = (SelectionKey JavaDoc) keys.next();
373           cleanupChannel(key.channel(), null);
374         }
375
376         catch (Exception JavaDoc e) {
377           logger.warn("Exception trying to close channel", e);
378         }
379       }
380
381       try {
382         selector.close();
383       } catch (Exception JavaDoc e) {
384         if ((Os.isMac()) && (Os.isUnix()) && (e.getMessage().equals("Bad file descriptor"))) {
385           // I can't find a specific bug about this, but I also can't seem to prevent the exception on the Mac.
386
// So just logging this as warning.
387
logger.warn("Exception trying to close selector: " + e.getMessage());
388         } else {
389           logger.error("Exception trying to close selector", e);
390         }
391       }
392     }
393
394     // drop any old selector tasks
395
selectorTasks = new LinkedQueue();
396   }
397
398   private boolean isCommThread() {
399     return isCommThread(Thread.currentThread());
400   }
401
402   private boolean isCommThread(Thread JavaDoc thread) {
403     if (thread == null) { return false; }
404     return thread == commThread;
405   }
406
407   private void doConnect(SelectionKey JavaDoc key) {
408     SocketChannel JavaDoc sc = (SocketChannel JavaDoc) key.channel();
409     TCConnectionJDK14 conn = (TCConnectionJDK14) key.attachment();
410
411     try {
412       if (sc.finishConnect()) {
413         sc.register(selector, SelectionKey.OP_READ, conn);
414         conn.finishConnect();
415       } else {
416         String JavaDoc errMsg = "finishConnect() returned false, but no exception thrown";
417
418         if (logger.isInfoEnabled()) {
419           logger.info(errMsg);
420         }
421
422         conn.fireErrorEvent(errMsg);
423       }
424     } catch (IOException JavaDoc ioe) {
425       if (logger.isInfoEnabled()) {
426         logger.info("IOException attempting to finish socket connection", ioe);
427       }
428
429       conn.fireErrorEvent(ioe, null);
430     }
431   }
432
433   private void modifyInterest(InterestRequest request) {
434     Assert.eval(isCommThread());
435
436     try {
437       final int existingOps;
438
439       SelectionKey JavaDoc key = request.channel.keyFor(selector);
440       if (key != null) {
441         existingOps = key.interestOps();
442       } else {
443         existingOps = 0;
444       }
445
446       if (logger.isDebugEnabled()) {
447         logger.debug(request);
448       }
449
450       if (request.add) {
451         request.channel.register(selector, existingOps | request.interestOps, request.attachment);
452       } else if (request.set) {
453         request.channel.register(selector, request.interestOps, request.attachment);
454       } else if (request.remove) {
455         request.channel.register(selector, existingOps ^ request.interestOps, request.attachment);
456       } else {
457         throw new TCInternalError();
458       }
459     } catch (ClosedChannelException JavaDoc cce) {
460       logger.warn("Exception trying to process interest request: " + cce);
461
462     } catch (CancelledKeyException JavaDoc cke) {
463       logger.warn("Exception trying to process interest request: " + cke);
464     }
465   }
466
467   private void doAccept(final SelectionKey JavaDoc key) {
468     Assert.eval(isCommThread());
469
470     SocketChannel JavaDoc sc = null;
471
472     TCListenerJDK14 lsnr = (TCListenerJDK14) key.attachment();
473
474     try {
475       final ServerSocketChannel JavaDoc ssc = (ServerSocketChannel JavaDoc) key.channel();
476       sc = ssc.accept();
477       sc.configureBlocking(false);
478       final Socket JavaDoc s = sc.socket();
479
480       try {
481         s.setSendBufferSize(64 * 1024);
482       } catch (IOException JavaDoc ioe) {
483         logger.warn("IOException trying to setSendBufferSize()");
484       }
485
486       try {
487         s.setTcpNoDelay(true);
488       } catch (IOException JavaDoc ioe) {
489         logger.warn("IOException trying to setTcpNoDelay()", ioe);
490       }
491
492       TCConnectionJDK14 conn = lsnr.createConnection(sc);
493       sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, conn);
494     } catch (IOException JavaDoc ioe) {
495       if (logger.isInfoEnabled()) {
496         logger.info("IO Exception accepting new connection", ioe);
497       }
498
499       cleanupChannel(sc, null);
500     }
501   }
502
503   private Selector JavaDoc selector;
504   private TCCommThread commThread = null;
505   private LinkedQueue selectorTasks = new LinkedQueue();
506
507   private static class InterestRequest {
508     final SelectableChannel JavaDoc channel;
509     final Object JavaDoc attachment;
510     final boolean set;
511     final boolean add;
512     final boolean remove;
513     final int interestOps;
514
515     static InterestRequest createAddInterestRequest(SelectableChannel JavaDoc channel, Object JavaDoc attachment, int interestOps) {
516       return new InterestRequest(channel, attachment, interestOps, false, true, false);
517     }
518
519     static InterestRequest createSetInterestRequest(SelectableChannel JavaDoc channel, Object JavaDoc attachment, int interestOps) {
520       return new InterestRequest(channel, attachment, interestOps, true, false, false);
521     }
522
523     static InterestRequest createRemoveInterestRequest(SelectableChannel JavaDoc channel, Object JavaDoc attachment, int interestOps) {
524       return new InterestRequest(channel, attachment, interestOps, false, false, true);
525     }
526
527     private InterestRequest(SelectableChannel JavaDoc channel, Object JavaDoc attachment, int interestOps, boolean set, boolean add,
528                             boolean remove) {
529       Assert.eval(remove ^ set ^ add);
530       Assert.eval(channel != null);
531
532       this.channel = channel;
533       this.attachment = attachment;
534       this.set = set;
535       this.add = add;
536       this.remove = remove;
537       this.interestOps = interestOps;
538     }
539
540     public String JavaDoc toString() {
541       StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
542
543       buf.append("Interest modify request: ").append(channel.toString()).append("\n");
544       buf.append("Ops: ");
545
546       if ((interestOps & SelectionKey.OP_ACCEPT) != 0) {
547         buf.append(" ACCEPT");
548       }
549
550       if ((interestOps & SelectionKey.OP_CONNECT) != 0) {
551         buf.append(" CONNECT");
552       }
553
554       if ((interestOps & SelectionKey.OP_READ) != 0) {
555         buf.append(" READ");
556       }
557
558       if ((interestOps & SelectionKey.OP_WRITE) != 0) {
559         buf.append(" WRITE");
560       }
561
562       buf.append("\n");
563
564       buf.append("Set: ").append(set).append(", Remove: ").append(remove).append(", Add: ").append(add).append("\n");
565       buf.append("Attachment: ");
566
567       if (attachment != null) {
568         buf.append(attachment.toString());
569       } else {
570         buf.append("null");
571       }
572
573       buf.append("\n");
574
575       return buf.toString();
576     }
577
578   }
579
580   // Little helper class to drive the selector. The main point of this class
581
// is to isolate the try/finally block around the entire selection process
582
private static class TCCommThread extends Thread JavaDoc {
583     final TCCommJDK14 commInstance;
584     final Set JavaDoc listeners = new HashSet JavaDoc();
585     final int number = getNextCounter();
586     final String JavaDoc baseName = "TCComm Selector Thread " + number;
587
588     private static int counter = 1;
589
590     private static synchronized int getNextCounter() {
591       return counter++;
592     }
593
594     TCCommThread(TCCommJDK14 comm) {
595       commInstance = comm;
596       setDaemon(true);
597       setName(baseName);
598
599       if (logger.isDebugEnabled()) {
600         logger.debug("Creating a new selector thread (" + toString() + ")", new Throwable JavaDoc());
601       }
602     }
603
604     String JavaDoc makeListenString(TCListener listener) {
605       StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
606       buf.append("(listen ");
607       buf.append(listener.getBindAddress().getHostAddress());
608       buf.append(':');
609       buf.append(listener.getBindPort());
610       buf.append(')');
611       return buf.toString();
612     }
613
614     synchronized void listenerRemoved(TCListener listener) {
615       listeners.remove(makeListenString(listener));
616       updateThreadName();
617     }
618
619     synchronized void listenerAdded(TCListener listener) {
620       listeners.add(makeListenString(listener));
621       updateThreadName();
622     }
623
624     private void updateThreadName() {
625       StringBuffer JavaDoc buf = new StringBuffer JavaDoc(baseName);
626       for (final Iterator JavaDoc iter = listeners.iterator(); iter.hasNext();) {
627         buf.append(' ');
628         buf.append(iter.next());
629       }
630
631       setName(buf.toString());
632     }
633
634     public void run() {
635       try {
636         commInstance.selectLoop();
637       } catch (Throwable JavaDoc t) {
638         logger.error("Unhandled exception from selectLoop", t);
639         t.printStackTrace();
640       } finally {
641         commInstance.dispose();
642       }
643     }
644   }
645
646 }
647
Popular Tags