KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > corba > se > impl > transport > SelectorImpl


1 /*
2  * @(#)SelectorImpl.java 1.17 04/04/07
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package com.sun.corba.se.impl.transport;
9
10 import java.io.IOException JavaDoc;
11 import java.nio.channels.ClosedChannelException JavaDoc;
12 import java.nio.channels.SelectableChannel JavaDoc;
13 import java.nio.channels.SelectionKey JavaDoc;
14 import java.nio.channels.Selector JavaDoc;
15 import java.util.ArrayList JavaDoc;
16 import java.util.HashMap JavaDoc;
17 import java.util.Iterator JavaDoc;
18 import java.util.List JavaDoc;
19
20 import com.sun.corba.se.pept.broker.Broker;
21 import com.sun.corba.se.pept.transport.Acceptor;
22 import com.sun.corba.se.pept.transport.Connection;
23 import com.sun.corba.se.pept.transport.EventHandler;
24 import com.sun.corba.se.pept.transport.ListenerThread;
25 import com.sun.corba.se.pept.transport.ReaderThread;
26
27 import com.sun.corba.se.spi.logging.CORBALogDomains;
28 import com.sun.corba.se.spi.orb.ORB;
29 import com.sun.corba.se.spi.orbutil.threadpool.Work;
30 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
31 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
32
33 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
34 import com.sun.corba.se.impl.orbutil.ORBUtility;
35
36 /**
37  * @author Harold Carr
38  */

39 public class SelectorImpl
40     extends
41     Thread JavaDoc
42     implements
43     com.sun.corba.se.pept.transport.Selector
44 {
45     private ORB orb;
46     private Selector JavaDoc selector;
47     private long timeout;
48     private List JavaDoc deferredRegistrations;
49     private List JavaDoc interestOpsList;
50     private HashMap JavaDoc listenerThreads;
51     private HashMap JavaDoc readerThreads;
52     private boolean selectorStarted;
53     private boolean closed;
54     private ORBUtilSystemException wrapper ;
55
56
57     public SelectorImpl(ORB orb)
58     {
59     this.orb = orb;
60     selector = null;
61     selectorStarted = false;
62     timeout = 60000;
63     deferredRegistrations = new ArrayList JavaDoc();
64     interestOpsList = new ArrayList JavaDoc();
65     listenerThreads = new HashMap JavaDoc();
66     readerThreads = new HashMap JavaDoc();
67     closed = false;
68         wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
69     }
70
71     public void setTimeout(long timeout)
72     {
73     this.timeout = timeout;
74     }
75
76     public long getTimeout()
77     {
78     return timeout;
79     }
80
81     public void registerInterestOps(EventHandler eventHandler)
82     {
83     if (orb.transportDebugFlag) {
84         dprint(".registerInterestOps:-> " + eventHandler);
85     }
86
87     SelectionKey JavaDoc selectionKey = eventHandler.getSelectionKey();
88     if (selectionKey.isValid()) {
89             int ehOps = eventHandler.getInterestOps();
90             SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
91         synchronized(interestOpsList) {
92         interestOpsList.add(keyAndOp);
93         }
94             // tell Selector Thread there's an update to a SelectorKey's Ops
95
selector.wakeup();
96     }
97     else {
98             wrapper.selectionKeyInvalid(eventHandler.toString());
99         if (orb.transportDebugFlag) {
100         dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
101         }
102     }
103
104     if (orb.transportDebugFlag) {
105         dprint(".registerInterestOps:<- ");
106     }
107     }
108
109     public void registerForEvent(EventHandler eventHandler)
110     {
111     if (orb.transportDebugFlag) {
112         dprint(".registerForEvent: " + eventHandler);
113     }
114
115     if (isClosed()) {
116         if (orb.transportDebugFlag) {
117         dprint(".registerForEvent: closed: " + eventHandler);
118         }
119         return;
120     }
121
122     if (eventHandler.shouldUseSelectThreadToWait()) {
123         synchronized (deferredRegistrations) {
124         deferredRegistrations.add(eventHandler);
125         }
126         if (! selectorStarted) {
127         startSelector();
128         }
129         selector.wakeup();
130         return;
131     }
132
133     switch (eventHandler.getInterestOps()) {
134     case SelectionKey.OP_ACCEPT :
135         createListenerThread(eventHandler);
136         break;
137     case SelectionKey.OP_READ :
138         createReaderThread(eventHandler);
139         break;
140     default:
141         if (orb.transportDebugFlag) {
142         dprint(".registerForEvent: default: " + eventHandler);
143         }
144         throw new RuntimeException JavaDoc(
145                 "SelectorImpl.registerForEvent: unknown interest ops");
146     }
147     }
148
149     public void unregisterForEvent(EventHandler eventHandler)
150     {
151     if (orb.transportDebugFlag) {
152         dprint(".unregisterForEvent: " + eventHandler);
153     }
154
155     if (isClosed()) {
156         if (orb.transportDebugFlag) {
157         dprint(".unregisterForEvent: closed: " + eventHandler);
158         }
159         return;
160     }
161
162     if (eventHandler.shouldUseSelectThreadToWait()) {
163         SelectionKey JavaDoc selectionKey = eventHandler.getSelectionKey();
164         selectionKey.cancel();
165         selector.wakeup();
166         return;
167     }
168
169     switch (eventHandler.getInterestOps()) {
170     case SelectionKey.OP_ACCEPT :
171         destroyListenerThread(eventHandler);
172         break;
173     case SelectionKey.OP_READ :
174         destroyReaderThread(eventHandler);
175         break;
176     default:
177         if (orb.transportDebugFlag) {
178         dprint(".unregisterForEvent: default: " + eventHandler);
179         }
180         throw new RuntimeException JavaDoc(
181                 "SelectorImpl.uregisterForEvent: unknown interest ops");
182     }
183     }
184
185     public void close()
186     {
187     if (orb.transportDebugFlag) {
188         dprint(".close");
189     }
190
191     if (isClosed()) {
192         if (orb.transportDebugFlag) {
193         dprint(".close: already closed");
194         }
195         return;
196     }
197
198     setClosed(true);
199
200     Iterator JavaDoc i;
201
202     // Kill listeners.
203

204     i = listenerThreads.values().iterator();
205     while (i.hasNext()) {
206         ListenerThread listenerThread = (ListenerThread) i.next();
207         listenerThread.close();
208     }
209
210     // Kill readers.
211

212     i = readerThreads.values().iterator();
213     while (i.hasNext()) {
214         ReaderThread readerThread = (ReaderThread) i.next();
215         readerThread.close();
216     }
217
218     // Selector
219

220     try {
221         if (selector != null) {
222         // wakeup Selector thread to process close request
223
selector.wakeup();
224         }
225     } catch (Throwable JavaDoc t) {
226         if (orb.transportDebugFlag) {
227         dprint(".close: selector.close: " + t);
228         }
229     }
230     }
231
232     ///////////////////////////////////////////////////
233
//
234
// Thread methods.
235
//
236

237     public void run()
238     {
239     setName("SelectorThread");
240     while (!closed) {
241         try {
242         int n = 0;
243         if (timeout == 0 && orb.transportDebugFlag) {
244             dprint(".run: Beginning of selection cycle");
245         }
246         handleDeferredRegistrations();
247         enableInterestOps();
248         try {
249             n = selector.select(timeout);
250         } catch (IOException JavaDoc e) {
251             if (orb.transportDebugFlag) {
252             dprint(".run: selector.select: " + e);
253             }
254         }
255         if (closed) {
256             selector.close();
257             if (orb.transportDebugFlag) {
258             dprint(".run: closed - .run return");
259             }
260             return;
261         }
262         /*
263           if (timeout == 0 && orb.transportDebugFlag) {
264           dprint(".run: selector.select() returned: " + n);
265           }
266           if (n == 0) {
267           continue;
268           }
269         */

270         Iterator JavaDoc iterator = selector.selectedKeys().iterator();
271         if (orb.transportDebugFlag) {
272             if (iterator.hasNext()) {
273             dprint(".run: n = " + n);
274             }
275         }
276         while (iterator.hasNext()) {
277             SelectionKey JavaDoc selectionKey = (SelectionKey JavaDoc) iterator.next();
278             iterator.remove();
279             EventHandler eventHandler = (EventHandler)
280             selectionKey.attachment();
281             try {
282             eventHandler.handleEvent();
283             } catch (Throwable JavaDoc t) {
284             if (orb.transportDebugFlag) {
285                 dprint(".run: eventHandler.handleEvent", t);
286             }
287             }
288         }
289         if (timeout == 0 && orb.transportDebugFlag) {
290             dprint(".run: End of selection cycle");
291         }
292         } catch (Throwable JavaDoc t) {
293         // IMPORTANT: ignore all errors so the select thread keeps running.
294
// Otherwise a guaranteed hang.
295
if (orb.transportDebugFlag) {
296             dprint(".run: ignoring", t);
297         }
298         }
299     }
300     }
301
302     /////////////////////////////////////////////////////
303
//
304
// Implementation.
305
//
306

307     private synchronized boolean isClosed ()
308     {
309     return closed;
310     }
311
312     private synchronized void setClosed(boolean closed)
313     {
314     this.closed = closed;
315     }
316
317     private void startSelector()
318     {
319     try {
320         selector = Selector.open();
321     } catch (IOException JavaDoc e) {
322         if (orb.transportDebugFlag) {
323         dprint(".startSelector: Selector.open: IOException: " + e);
324         }
325         // REVISIT - better handling/reporting
326
RuntimeException JavaDoc rte =
327         new RuntimeException JavaDoc(".startSelector: Selector.open exception");
328         rte.initCause(e);
329         throw rte;
330     }
331     setDaemon(true);
332     start();
333     selectorStarted = true;
334     if (orb.transportDebugFlag) {
335         dprint(".startSelector: selector.start completed.");
336     }
337     }
338
339     private void handleDeferredRegistrations()
340     {
341     synchronized (deferredRegistrations) {
342             int deferredListSize = deferredRegistrations.size();
343             for (int i = 0; i < deferredListSize; i++) {
344                 EventHandler eventHandler =
345             (EventHandler)deferredRegistrations.get(i);
346                 if (orb.transportDebugFlag) {
347                     dprint(".handleDeferredRegistrations: " + eventHandler);
348                 }
349                 SelectableChannel JavaDoc channel = eventHandler.getChannel();
350                 SelectionKey JavaDoc selectionKey = null;
351                 try {
352                     selectionKey =
353                         channel.register(selector,
354                                          eventHandler.getInterestOps(),
355                                          (Object JavaDoc)eventHandler);
356                 } catch (ClosedChannelException JavaDoc e) {
357                     if (orb.transportDebugFlag) {
358                         dprint(".handleDeferredRegistrations: " + e);
359                     }
360                 }
361                 eventHandler.setSelectionKey(selectionKey);
362             }
363             deferredRegistrations.clear();
364         }
365     }
366
367     private void enableInterestOps()
368     {
369     synchronized (interestOpsList) {
370         int listSize = interestOpsList.size();
371         if (listSize > 0) {
372                 if (orb.transportDebugFlag) {
373                     dprint(".enableInterestOps:->");
374                 }
375                 SelectionKey JavaDoc selectionKey = null;
376         SelectionKeyAndOp keyAndOp = null;
377         int keyOp, selectionKeyOps = 0;
378         for (int i = 0; i < listSize; i++) {
379             keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
380             selectionKey = keyAndOp.selectionKey;
381
382             // Need to check if the SelectionKey is valid because a
383
// connection's SelectionKey could be put on the list to
384
// have its OP enabled and before it's enabled be reclaimed.
385
// Otherwise, the enabling of the OP will throw an exception
386
// here and exit this method an potentially not enable all
387
// registered ops.
388
//
389
// So, we ignore SelectionKeys that are invalid. They will get
390
// cleaned up on the next Selector.select() call.
391

392             if (selectionKey.isValid()) {
393                         if (orb.transportDebugFlag) {
394                             dprint(".enableInterestOps: " + keyAndOp);
395                         }
396                 keyOp = keyAndOp.keyOp;
397                 selectionKeyOps = selectionKey.interestOps();
398                 selectionKey.interestOps(selectionKeyOps | keyOp);
399             }
400         }
401         interestOpsList.clear();
402                 if (orb.transportDebugFlag) {
403                     dprint(".enableInterestOps:<-");
404                 }
405         }
406     }
407     }
408
409     private void createListenerThread(EventHandler eventHandler)
410     {
411     if (orb.transportDebugFlag) {
412         dprint(".createListenerThread: " + eventHandler);
413     }
414     Acceptor acceptor = eventHandler.getAcceptor();
415     ListenerThread listenerThread =
416         new ListenerThreadImpl(orb, acceptor, this);
417     listenerThreads.put(eventHandler, listenerThread);
418     Throwable JavaDoc throwable = null;
419     try {
420         orb.getThreadPoolManager().getThreadPool(0)
421         .getWorkQueue(0).addWork((Work)listenerThread);
422     } catch (NoSuchThreadPoolException e) {
423         throwable = e;
424     } catch (NoSuchWorkQueueException e) {
425         throwable = e;
426     }
427     if (throwable != null) {
428         RuntimeException JavaDoc rte = new RuntimeException JavaDoc(throwable.toString());
429         rte.initCause(throwable);
430         throw rte;
431     }
432     }
433
434     private void destroyListenerThread(EventHandler eventHandler)
435     {
436     if (orb.transportDebugFlag) {
437         dprint(".destroyListenerThread: " + eventHandler);
438     }
439     ListenerThread listenerThread = (ListenerThread)
440         listenerThreads.get(eventHandler);
441     if (listenerThread == null) {
442         if (orb.transportDebugFlag) {
443         dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
444         }
445         return;
446     }
447     listenerThreads.remove(eventHandler);
448     listenerThread.close();
449     }
450
451     private void createReaderThread(EventHandler eventHandler)
452     {
453     if (orb.transportDebugFlag) {
454         dprint(".createReaderThread: " + eventHandler);
455     }
456     Connection connection = eventHandler.getConnection();
457     ReaderThread readerThread =
458         new ReaderThreadImpl(orb, connection, this);
459     readerThreads.put(eventHandler, readerThread);
460     Throwable JavaDoc throwable = null;
461     try {
462         orb.getThreadPoolManager().getThreadPool(0)
463         .getWorkQueue(0).addWork((Work)readerThread);
464     } catch (NoSuchThreadPoolException e) {
465         throwable = e;
466     } catch (NoSuchWorkQueueException e) {
467         throwable = e;
468     }
469     if (throwable != null) {
470         RuntimeException JavaDoc rte = new RuntimeException JavaDoc(throwable.toString());
471         rte.initCause(throwable);
472         throw rte;
473     }
474     }
475
476     private void destroyReaderThread(EventHandler eventHandler)
477     {
478     if (orb.transportDebugFlag) {
479         dprint(".destroyReaderThread: " + eventHandler);
480     }
481     ReaderThread readerThread = (ReaderThread)
482         readerThreads.get(eventHandler);
483     if (readerThread == null) {
484         if (orb.transportDebugFlag) {
485         dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
486         }
487         return;
488     }
489     readerThreads.remove(eventHandler);
490     readerThread.close();
491     }
492
493     private void dprint(String JavaDoc msg)
494     {
495     ORBUtility.dprint("SelectorImpl", msg);
496     }
497
498     protected void dprint(String JavaDoc msg, Throwable JavaDoc t)
499     {
500     dprint(msg);
501     t.printStackTrace(System.out);
502     }
503
504     // Private class to contain a SelectionKey and a SelectionKey op.
505
// Used only by SelectorImpl to register and enable SelectionKey
506
// Op.
507
// REVISIT - Could do away with this class and use the EventHanlder
508
// directly.
509
private class SelectionKeyAndOp
510     {
511         // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
512
public int keyOp;
513         public SelectionKey JavaDoc selectionKey;
514
515         // constructor
516
public SelectionKeyAndOp(SelectionKey JavaDoc selectionKey, int keyOp) {
517         this.selectionKey = selectionKey;
518         this.keyOp = keyOp;
519     }
520     }
521
522 // End of file.
523
}
524
525
Popular Tags