KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > kernel > AbstractConnectionInfo


1 package com.ubermq.kernel;
2
3 import java.io.*;
4 import java.nio.*;
5 import java.util.*;
6
7 import org.apache.log4j.*;
8
9 import EDU.oswego.cs.dl.util.concurrent.*;
10
11 import com.ubermq.kernel.event.*;
12 import com.ubermq.util.*;
13
14 /**
15  * Implements the IConnectionInfo as an abstract base class, and
16  * provides typically useful functionality including a read buffer,
17  * a write buffer, exception handling, and interaction with a datagram
18  * factory and message processor. Most things that a datagram-aware
19  * connection will be doing are implemented to a large extent herein.
20  */

21 public abstract class AbstractConnectionInfo
22     implements IConnectionInfo
23 {
24     private static final Logger log = Logger.getLogger(AbstractConnectionInfo.class);
25
26     private String JavaDoc id;
27
28     /**
29      * Indicates whether datagrams read from this connection
30      * should be passed to the message processor. When set to false,
31      * incoming datagrams are discarded.
32      */

33     protected boolean shouldProcess;
34
35     private SynchronizedBoolean open;
36
37     private ByteBuffer readBuffer;
38     private ByteBuffer writeBuffer;
39
40     private Sync readMutex, writeMutex;
41
42     private List eventHandlers;
43
44     // statics
45
private static long nextId = 2;
46
47     /**
48      * The read/write buffer size, as specified by the global property configurator.
49      */

50     protected static final int MAX_READ = Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_BUFFER_SIZE,
51                                                                                    "1048576")).intValue();
52
53     /**
54      * The flush threshold, as specified by the global configurator.
55      */

56     protected static final int FLUSH_BUFFER_THRESHOLD = MAX_READ / Integer.valueOf(Configurator.getProperty(ConfigConstants.GENERAL_CONNECTION_FLUSH_DIVISOR, "2")).intValue();
57
58     // the message processor. very important.
59
private final IMessageProcessor proc;
60
61     // the datagram factory. also very important.
62
private final IDatagramFactory factory;
63
64     /**
65      * Uses buffer sizes from the global configurator.
66      *
67      * @param p the MessageProcessor that will handle datagrams as they are read in
68      * from the read buffer.
69      * @param f the DatagramFactory that will process raw byte streams and perform
70      * framing and interpretation.
71      */

72     public AbstractConnectionInfo(IMessageProcessor p,
73                                   IDatagramFactory f)
74     {
75         this(p,
76              f,
77              MAX_READ,
78              MAX_READ);
79     }
80
81     /**
82      * @param p the MessageProcessor that will handle datagrams as they are read in
83      * from the read buffer.
84      * @param f the DatagramFactory that will process raw byte streams and perform
85      * framing and interpretation.
86      * @param rbuf the size, in bytes, of the read buffer.
87      * @param wbuf the size, in bytes, of the write buffer.
88      */

89     public AbstractConnectionInfo(IMessageProcessor p,
90                                   IDatagramFactory f,
91                                   int rbuf,
92                                   int wbuf)
93     {
94         this(p,
95              f,
96                  (rbuf > 0) ? ByteBuffer.allocateDirect(rbuf) : null,
97                  (wbuf > 0) ? ByteBuffer.allocateDirect(wbuf) : null);
98     }
99
100     /**
101      * @param p the MessageProcessor that will handle datagrams as they are read in
102      * from the read buffer.
103      * @param f the DatagramFactory that will process raw byte streams and perform
104      * framing and interpretation.
105      * @param r the actual ByteBuffer used as the read buffer
106      * @param w the actual ByteBuffer used as the write buffer
107      */

108     public AbstractConnectionInfo(IMessageProcessor p,
109                                   IDatagramFactory f,
110                                   ByteBuffer r,
111                                   ByteBuffer w)
112     {
113         readBuffer = r;
114         if (readBuffer != null)
115         {
116             readMutex = new ReentrantLock();
117         }
118
119         writeBuffer = w;
120         if (writeBuffer != null)
121         {
122             writeMutex = new ReentrantLock();
123         }
124
125         this.factory = f;
126         id = String.valueOf(allocateProcessUniqueId());
127         shouldProcess = true;
128         open = new SynchronizedBoolean(true);
129         eventHandlers = new LinkedList();
130         proc = p;
131
132         // send connected event
133
sendEvent(ConnectionEvent.CONNECTION_CONNECTED);
134     }
135
136     public void close()
137     {
138         if (open.get())
139         {
140             open.set(false);
141             proc.remove(this);
142
143             // make sure everything is out of the
144
// wbuffer
145
assert writeBuffer.position() == 0 : "buffer not empty";
146
147             // closed normally (or abnormally)
148
sendEvent(ConnectionEvent.CONNECTION_CLOSED);
149         }
150     }
151
152     public boolean isOpen()
153     {
154         return open.get();
155     }
156
157     public void addEventListener(ConnectionEventListener l)
158     {
159         eventHandlers.add(l);
160     }
161
162     public void removeEventListener(ConnectionEventListener l)
163     {
164         eventHandlers.remove(l);
165     }
166
167     /**
168      * Sends an event to all the registered event listeners.
169      *
170      * @param event the event object
171      */

172     protected void sendEvent(ConnectionEvent event)
173     {
174         log.debug("sending connection event " + event);
175
176         Iterator iter = eventHandlers.iterator();
177         while (iter.hasNext())
178         {
179             ConnectionEventListener l = (ConnectionEventListener)iter.next();
180             try
181             {
182                 l.connectionEvent(event);
183             }
184             catch(RuntimeException JavaDoc x)
185             {
186                 // listeners should not throw a runtime exception.
187
// move on.
188
log.fatal("", x);
189             }
190         }
191     }
192
193     /**
194      * Sends an event, using the code specified.
195      * @param eventCode an event code
196      */

197     void sendEvent(int eventCode)
198     {
199         sendEvent(new ConnectionEvent(this, eventCode));
200     }
201
202     public static synchronized long allocateProcessUniqueId()
203     {
204         return ++nextId;
205     }
206
207     /**
208      * Output a datagram. If we run out of output buffer space,
209      * we call h.overflow() to potentially fix the situation.
210      * if overflow() returns true, we will attempt the output operation
211      * again and repeat the process using the overflow handler returned
212      * from h.getRetryHandler(). <p>
213      *
214      * In this way, it is possible to create a sequence of overflow handling
215      * logic that is a markov process. If the overflow() routine ever returns
216      * false, we abort the output operation.
217      *
218      * @throws IOException if the output fails due to I/O failure, or we are
219      * not open.
220      */

221     public void output(IDatagram d, IOverflowHandler h)
222         throws IOException, BufferOverflowException
223     {
224         if (!open.get())
225             throw new IOException("not open");
226
227         try
228         {
229             while(open.get())
230             {
231                 try
232                 {
233                     writeMutex.acquire();
234
235                     // make a sandbox for the output framer
236
ByteBuffer output = writeBuffer.slice();
237                     factory.outgoing(output, d);
238
239                     // update the write buffer position
240
writeBuffer.position(writeBuffer.position() + output.position());
241                     break;
242                 }
243                 catch(BufferOverflowException boe)
244                 {
245                 }
246                 finally
247                 {
248                     writeMutex.release();
249                 }
250
251                 // we just ran out of space.
252
// please write some bytes.
253
requestWrite();
254
255                 // handle it
256
int a = processOverflow(d, h);
257                 if (a == IOverflowHandler.ACTION_RETRY)
258                 {
259                     h = h.getRetryHandler();
260                 }
261                 else if (a == IOverflowHandler.ACTION_FAIL)
262                     throw new BufferOverflowException();
263                 else
264                     break; // IGNORE - we quit.
265
}
266
267             // flush the buffers
268
requestWrite();
269         }
270         catch(InterruptedException JavaDoc ie)
271         {
272             // abort the current operation
273
// we dont' have the mutex, so we can't
274
// do anything to the buffer.
275
}
276     }
277
278     /**
279      * Processes an overflow using the specified handler. This determines
280      * if the handler can support extra connection information, and gives it
281      * if so.
282      */

283     private int processOverflow(IDatagram d,
284                                 IOverflowHandler h)
285     {
286         if (h instanceof IConnectionOverflowHandler)
287         {
288             return ((IConnectionOverflowHandler)h).overflow(d, this, proc);
289         }
290         else
291         {
292             return h.overflow(d);
293         }
294     }
295
296     public void flush()
297         throws IOException
298     {
299         try
300         {
301             writeMutex.acquire();
302             doFlush();
303         }
304         catch(InterruptedException JavaDoc ie)
305         {
306             // abort the current operation
307
// we dont' have the mutex, so we can't
308
// do anything to the buffer.
309
}
310         finally
311         {
312             writeMutex.release();
313         }
314     }
315
316     /**
317      * this method should be called when the caller has the
318      * mutex on the write buffer.
319      */

320     private void doFlush()
321         throws IOException
322     {
323         // write the data out to the channel if there is any data.
324
// if there's no data, this method will cause the
325
// connection info to remember the write attempt and will
326
// subsequently return true from readyToWrite()
327
try
328         {
329             if (writeBuffer.position() > 0)
330             {
331                 writeBuffer.flip();
332                 int n = doWrite(writeBuffer);
333                 log.debug(this + " flushed " + n + " octets");
334                 efficientCompact(writeBuffer);
335             }
336
337             // if we're done, cancel the write request
338
if (writeBuffer.position() == 0)
339                 cancelWriteRequest();
340         }
341         catch(java.io.IOException JavaDoc iox)
342         {
343             log.debug("Unable to write bytes", iox);
344             
345             // tell the listeners
346
sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION);
347
348             // free resources. this connection is no longer usable.
349
cancelWriteRequest();
350             close();
351
352             // propagate the exception
353
throw iox;
354         }
355     }
356
357     /**
358      * Writes the contents of the write buffer to its ultimate destination.
359      * The position of the buffer will be set to zero and the limit will
360      * indicate the number of valid bytes in the buffer on input. When the method
361      * returns, the position of buffer will indicate how many bytes were
362      * written to the destination. Bytes before the position of the buffer
363      * when the method returns may be discarded.
364      */

365     protected abstract int doWrite(ByteBuffer writeBuffer)
366         throws java.io.IOException JavaDoc;
367
368     protected void requestWrite()
369         throws IOException
370     {
371         doFlush();
372     }
373
374     protected void cancelWriteRequest()
375     {
376     }
377
378     protected boolean readyToWrite()
379     {
380         try
381         {
382             writeMutex.acquire();
383             return writeBuffer.hasRemaining();
384         }
385         catch(InterruptedException JavaDoc ie)
386         {
387             // abort the current operation
388
// we dont' have the mutex, so we can't
389
// do anything to the buffer.
390
return false;
391         }
392         finally
393         {
394             writeMutex.release();
395         }
396     }
397
398     //////// READ METHODS
399

400     /**
401      * Requests access to the read buffer for an input operation. This method is necessary
402      * in order to obtain the mutex protecting this buffer.
403      */

404     protected ByteBuffer getReadBuffer()
405         throws InterruptedException JavaDoc
406     {
407         readMutex.acquire();
408         return readBuffer;
409     }
410
411     /**
412      * Releases the mutex on the read buffer, indicating that the input operation
413      * is completed.
414      */

415     protected void releaseReadBuffer(ByteBuffer rb)
416     {
417         readMutex.release();
418     }
419
420     /**
421      * Processes data in the read buffer using the datagram factory specified
422      * at creation time.
423      */

424     public void processData()
425     {
426         try
427         {
428             readMutex.acquire();
429
430             // make a view on the data buffer.
431
int expecting=0;
432             preProcessData();
433
434             // iterate through the view's data until we run out.
435
while(true)
436             {
437                 // FRAMING
438
// call the datagram factory to figure out
439
// how much data we need.
440
expecting = factory.frame(readBuffer);
441
442                 // PROCESS
443
// if we have enough data.
444
// if we don't, go back to the I/O processor.
445
if (readBuffer.remaining() >= expecting)
446                 {
447                     // make a new process buffer.
448
ByteBuffer process = readBuffer.slice();
449                     process.limit(expecting);
450
451                     // read past the data so the buffer position is right after
452
// the datagram we just read. this is an important
453
// step to take for subclasses who may want to record
454
// where in the buffer datagrams begin & end.
455
readBuffer.position(readBuffer.position() + expecting);
456
457                     // go process it
458
if (shouldProcess)
459                     {
460                         // load the datagram.
461
IDatagram d = factory.incoming(process);
462
463                         // now we'll process it according to our RULES.
464
proc.process(this, d);
465                     }
466                 }
467                 else
468                 {
469                     break;
470                 }
471             }
472         }
473         catch (java.io.IOException JavaDoc ise)
474         {
475             log.debug("Invalid Protocol Detected", ise);
476
477             // invalid data event.
478
sendEvent(ConnectionEvent.CONNECTION_INVALID_PROTOCOL);
479
480             // our read buffer is unintelligible.
481
// we have no choice but to close the connection.
482
close();
483         }
484         catch (InterruptedException JavaDoc ie)
485         {
486             // NOTHING TO DO HERE.
487
}
488         finally
489         {
490             // do post processing cleanup
491
postProcessData();
492
493             // done
494
readMutex.release();
495         }
496     }
497
498     /**
499      * prepares for a sequence of read operations
500      * position <= limit and remaining() will reflect the number
501      * of bytes processed.
502      */

503     protected void preProcessData()
504     {
505         readBuffer.flip();
506     }
507
508     /**
509      * performs post processing after interpreting the contents of the read buffer.
510      */

511     protected void postProcessData()
512     {
513         // discard the processed data.
514
efficientCompact(readBuffer);
515     }
516
517     /**
518      * Compacts a buffer. This is optimized here to avoid
519      * a call to copyMemory() inside the NIO library that
520      * is made even if there are no bytes to copy, i.e.
521      * <code>remaining</code> returns 0.
522      */

523     private void efficientCompact(ByteBuffer bb)
524     {
525         if (bb.hasRemaining())
526         {
527             bb.compact();
528         }
529         else
530             bb.clear();
531     }
532
533     public String JavaDoc toString()
534     {
535         return getId();
536     }
537
538     public final String JavaDoc getId()
539     {
540         return id;
541     }
542
543     public boolean equals(Object JavaDoc o)
544     {
545         try
546         {
547             return (getId().equals( ((ConnectionInfo)o).getId()));
548         }
549         catch (ClassCastException JavaDoc e)
550         {
551             return false;
552         }
553     }
554
555     public int hashCode()
556     {
557         return getId().hashCode();
558     }
559 }
560
561
Popular Tags