1 package com.ubermq.kernel; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.kernel.event.*; 5 import java.io.*; 6 import java.nio.*; 7 import java.nio.channels.*; 8 import java.util.*; 9 10 13 public class ConnectionInfo 14 extends AbstractConnectionInfo 15 { 16 private WritableByteChannel out; 17 private ReadableByteChannel in; 18 19 private ReadWriteTransformThread writeHandler; 20 21 27 public ConnectionInfo(IMessageProcessor p, 28 IDatagramFactory f) 29 { 30 super(p,f); 31 } 32 33 void setIOHandler(ReadWriteTransformThread rwtt, int operation) 34 { 35 if (operation == SelectionKey.OP_WRITE) 36 this.writeHandler = rwtt; 37 else if (operation == SelectionKey.OP_READ) 38 rwtt.requestService(this); 39 } 40 41 public void start() 42 { 43 shouldProcess = true; 44 } 45 46 public void stop() 47 { 48 shouldProcess = false; 49 } 50 51 public ReadableByteChannel in() {return in;} 52 public WritableByteChannel out() {return out;} 53 54 59 public void attach(ReadableByteChannel in, WritableByteChannel out) 60 { 61 this.in = in; 62 this.out = out; 63 } 64 65 79 protected void requestWrite() 80 throws IOException 81 { 82 if (writeHandler == null) 83 throw new IllegalStateException ("writeHandler cannot be null in requestWrite()"); 84 85 if (super.readyToWrite()) 86 { 87 writeHandler.requestService(this); 88 } 89 } 90 91 protected void cancelWriteRequest() 92 { 93 if (writeHandler == null) 94 throw new IllegalStateException ("writeHandler cannot be null in cancelWriteRequest()"); 95 96 writeHandler.cancelServiceRequest(this); 97 } 98 99 public int doWrite(ByteBuffer writeBuffer) 100 throws java.io.IOException 101 { 102 return out().write(writeBuffer); 103 } 104 105 110 void readFrom(ReadableByteChannel channel, 111 SelectionKey key) 112 { 113 ByteBuffer readBuffer = null; 114 try 115 { 116 readBuffer = getReadBuffer(); 117 int n = channel.read(readBuffer); 118 119 if (n == -1) 122 { 123 key.cancel(); 124 125 close(); 127 } 128 } 129 catch(java.io.IOException iox) 130 { 131 sendEvent(ConnectionEvent.CONNECTION_IO_EXCEPTION); 132 close(); 133 } 134 catch(InterruptedException ie) 135 { 136 return; 138 } 139 finally 140 { 141 releaseReadBuffer(readBuffer); 142 } 143 144 processData(); 146 } 147 } 148 | Popular Tags |