KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > kernel > ReadWriteTransformThread


1 package com.ubermq.kernel;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.kernel.*;
5 import com.ubermq.kernel.event.*;
6 import java.io.*;
7 import java.nio.channels.*;
8 import java.util.*;
9 import org.apache.log4j.*;
10
11 /**
12  * A generic, selector-based I/O thread that works with a <code>ConnectionList</code>
13  * to add connections for I/O servicing.
14  */

15 public final class ReadWriteTransformThread
16     extends Thread JavaDoc
17 {
18     private static final Logger log = Logger.getLogger(ReadWriteTransformThread.class);
19
20     private Selector selector;
21     private List toRegister, toUnregister, toEnable, toDisable;
22     private Object JavaDoc serviceNotifier;
23     private int operation;
24     private int spinDetector;
25     private static final int THRESHOLD = 1000000;
26
27     /**
28      * Constructs an I/O processing thread.<P>
29      *
30      * @param selector The selector indicating READ readiness.
31      * @param acceptedConnections The ConnectionList used to add connections for service
32      * @param operation an opcode for use with the selector, one of <code>SelectionKey.OP_xxx</code>
33      */

34     public ReadWriteTransformThread(int operation)
35         throws IOException
36     {
37         super("Channel-Based NIO " + (operation == SelectionKey.OP_READ ? "reader" : "writer"));
38         setDaemon(true);
39
40         this.selector = Selector.open();
41         this.toRegister = Collections.synchronizedList(new LinkedList());
42         this.toUnregister = Collections.synchronizedList(new LinkedList());
43         this.toEnable = Collections.synchronizedList(new LinkedList());
44         this.toDisable = Collections.synchronizedList(new LinkedList());
45         this.serviceNotifier = new Object JavaDoc();
46         this.operation = operation;
47         this.spinDetector = 0;
48     }
49
50     /**
51      * Executes the I/O thread. This will service connections
52      * that are ready for reading as indicated by the selector, and
53      * will add new connections as they become available in the
54      * connection list.
55      */

56     public void run()
57     {
58         spinDetector = 0;
59         while(!isInterrupted())
60         {
61             try
62             {
63                 registerNewChannels();
64                 int n = selector.select();
65
66                 if (n > 0)
67                 {
68                     spinDetector = 0;
69                     acceptPendingRequests();
70                 }
71                 else
72                 {
73                     ++spinDetector;
74                     if (spinDetector > THRESHOLD)
75                     {
76                         log.debug("Network loss detected! Selector observed " + spinDetector + " consecutive unblocked select() calls without results");
77                         spinDetector = 0;
78
79                         // attempt to make a new selector
80
Selector newSelector = Selector.open();
81                         Iterator iter = selector.keys().iterator();
82                         while (iter.hasNext())
83                         {
84                             SelectionKey sk = (SelectionKey)iter.next();
85                             ConnectionInfo ci = (ConnectionInfo)sk.attachment();
86
87                             // cancel the old key
88
int ops = sk.interestOps();
89                             sk.cancel();
90
91                             // register new key
92
SelectableChannel channel = getChannelForConnection(ci);
93                             channel.register(newSelector, ops, ci);
94                         }
95                         this.selector.close();
96                         this.selector = newSelector;
97                         log.debug("Successfully created new selector.");
98                     }
99                 }
100             }
101             catch(Exception JavaDoc ex)
102             {
103                 log.debug("Error in select() block", ex);
104             }
105         }
106
107         // kill all associated connections
108
Iterator iter = selector.keys().iterator();
109         while (iter.hasNext())
110         {
111             SelectionKey key = (SelectionKey)iter.next();
112             key.cancel();
113
114             ConnectionInfo conn = (ConnectionInfo)key.attachment();
115             conn.close();
116         }
117     }
118
119     /**
120      * Registers the connection with this transform IO thread for later
121      * servicing. This method can be performed synchronously, if desired.<P>
122      *
123      * @param ci the connection
124      * @param sync true if the method should block until registration has
125      * completed.
126      */

127     public void register(ConnectionInfo ci, boolean sync)
128     {
129         synchronized(serviceNotifier)
130         {
131             toRegister.add(ci);
132             selector.wakeup();
133          
134             if (sync)
135             {
136                 try
137                 {
138                     serviceNotifier.wait();
139                 }
140                 catch (InterruptedException JavaDoc e)
141                 {
142                     log.debug("interrupted waiting for registration", e);
143                 }
144             }
145         }
146     }
147
148     public void unregister(ConnectionInfo ci)
149     {
150         toUnregister.add(ci);
151         selector.wakeup();
152     }
153
154     /**
155      * Requests service for this connection. If a service cancellation
156      * request is still pending for this connection, it is removed.<P>
157      *
158      * @param ci the connection to service
159      */

160     public void requestService(ConnectionInfo ci)
161     {
162         toEnable.add(ci);
163         toDisable.remove(ci);
164         selector.wakeup();
165     }
166
167     /**
168      * Cancels a service request for this connection. The cancellation
169      * request will not be honored if there is another caller who has
170      * registered a pending service request for the same connection. This
171      * is to prevent a situation where one thread requests service and
172      * another thread, just having been serviced, cancels it. <P>
173      *
174      * In such a case, the cancellation request is ignored and the pending
175      * service request takes precedence.<P>
176      *
177      * @param ci the connection to cancel service for
178      */

179     public void cancelServiceRequest(ConnectionInfo ci)
180     {
181         if (!toEnable.contains(ci))
182         {
183             toDisable.add(ci);
184             selector.wakeup();
185         }
186     }
187
188     private void registerNewChannels() throws IOException
189     {
190         synchronized(toRegister)
191         {
192             // register channels
193
Iterator iter = toRegister.iterator();
194             while (iter.hasNext())
195             {
196                 ConnectionInfo conn = (ConnectionInfo)iter.next();
197                 if (conn.in() instanceof SelectableChannel)
198                 {
199                     doRegister(conn);
200                 }
201                 else
202                 {
203                     // ignore the new connection
204
}
205             }
206             toRegister.clear();
207         }
208
209         synchronized(toUnregister)
210         {
211             Iterator iter = toUnregister.iterator();
212             while (iter.hasNext())
213             {
214                 ConnectionInfo conn = (ConnectionInfo)iter.next();
215                 SelectableChannel channel = getChannelForConnection(conn);
216                 SelectionKey sk = channel.keyFor(selector);
217                 if (sk != null)
218                 {
219                     sk.cancel();
220                 }
221                 else
222                     log.debug("can't unregister - nothing known about " + conn);
223             }
224             toUnregister.clear();
225         }
226
227         synchronized(toEnable)
228         {
229             // enable
230
Iterator iter = toEnable.iterator();
231             while(iter.hasNext())
232             {
233                 ConnectionInfo conn = (ConnectionInfo)iter.next();
234                 SelectableChannel channel = getChannelForConnection(conn);
235                 SelectionKey sk = channel.keyFor(selector);
236                 if (sk != null)
237                 {
238                     try
239                     {
240                         sk.interestOps(operation);
241                         log.debug("enabled " + conn + " for op " + getFriendlyOpName(operation));
242                     }
243                     catch (CancelledKeyException e)
244                     {
245                         log.debug("can't enable cancelled key " + sk + " for " + conn, e);
246                     }
247                 }
248                 else
249                     log.debug("can't enable - nothing known about " + conn);
250             }
251             toEnable.clear();
252         }
253
254         synchronized(toDisable)
255         {
256             // disable
257
Iterator iter = toDisable.iterator();
258             while(iter.hasNext())
259             {
260                 ConnectionInfo conn = (ConnectionInfo)iter.next();
261                 SelectableChannel channel = getChannelForConnection(conn);
262                 SelectionKey sk = channel.keyFor(selector);
263                 if (sk != null)
264                 {
265                     try
266                     {
267                         sk.interestOps(0);
268                         log.debug("disabled " + conn + " for op " + getFriendlyOpName(operation));
269                     }
270                     catch(CancelledKeyException cke)
271                     {
272                         // ignore this
273
log.debug("can't disable - already cancelled " + conn);
274                     }
275                 }
276                 else
277                     log.debug("can't disable - nothing known about " + conn);
278             }
279             toDisable.clear();
280         }
281         
282         // DONE, notify those who are waiting on us.
283
synchronized(serviceNotifier)
284         {
285             serviceNotifier.notifyAll();
286         }
287     }
288     
289     private String JavaDoc getFriendlyOpName(int op)
290     {
291         if (op == SelectionKey.OP_READ)
292             return "OP_READ";
293         else if (op == SelectionKey.OP_WRITE)
294             return "OP_WRITE";
295         else if (op == SelectionKey.OP_ACCEPT)
296             return "OP_ACCEPT";
297         else
298             return String.valueOf(op);
299     }
300
301     /**
302      * Returns the appropriate in or out channel for a connection and
303      * our activated operation.<P>
304      */

305     private SelectableChannel getChannelForConnection(ConnectionInfo conn)
306     {
307         return operation == SelectionKey.OP_WRITE ?
308             (SelectableChannel)conn.out() :
309             (SelectableChannel)conn.in();
310     }
311
312     private void doRegister(final ConnectionInfo conn)
313         throws IOException
314     {
315         SelectableChannel channel = getChannelForConnection(conn);
316         log.debug("registering " + getFriendlyOpName(operation) + ": conn " + conn + ", new channel is " + channel);
317         channel.configureBlocking(false);
318
319         // register it
320
channel.register(selector, 0, conn);
321
322         // tell the connection about us
323
conn.setIOHandler(this, operation);
324
325         // register a close event handler,
326
// so we can dispose of the connection selection key
327
// when it is closed.
328
conn.addEventListener(new ConnectionEventListener()
329                               {
330                     public void connectionEvent(ConnectionEvent e)
331                     {
332                         if (e.getEventCode() == ConnectionEvent.CONNECTION_CLOSED ||
333                             e.getEventCode() == ConnectionEvent.CONNECTION_IO_EXCEPTION)
334                         {
335                             unregister(conn);
336                         }
337                     }
338                 });
339     }
340
341     private synchronized void acceptPendingRequests()
342         throws IOException
343     {
344         Set readyKeys = selector.selectedKeys();
345
346         for(Iterator i = readyKeys.iterator(); i.hasNext(); )
347         {
348             SelectionKey key = (SelectionKey)i.next();
349             i.remove();
350
351             ConnectionInfo conn = (ConnectionInfo)key.attachment();
352             if (!key.isValid())
353             {
354                 log.debug("key no longer valid, skipping");
355                 continue;
356             }
357             else
358             {
359                 if (operation == SelectionKey.OP_WRITE &&
360                     key.isWritable())
361                 {
362                     conn.flush();
363                 }
364                 else if (operation == SelectionKey.OP_READ &&
365                          key.isReadable())
366                 {
367                     ReadableByteChannel incomingChannel = (ReadableByteChannel)key.channel();
368
369                     // read the data from the channel
370
conn.readFrom(incomingChannel, key);
371                 }
372             }
373         }
374     }
375
376 }
377
378
Popular Tags