1 18 package org.apache.activemq.transport.nio; 19 20 import java.io.DataInputStream ; 21 import java.io.DataOutputStream ; 22 import java.io.EOFException ; 23 import java.io.IOException ; 24 import java.net.Socket ; 25 import java.net.URI ; 26 import java.net.UnknownHostException ; 27 import java.nio.ByteBuffer ; 28 import java.nio.channels.SelectionKey ; 29 import java.nio.channels.SocketChannel ; 30 31 import javax.net.SocketFactory; 32 33 import org.apache.activemq.wireformat.WireFormat; 34 import org.apache.activemq.command.Command; 35 import org.apache.activemq.transport.Transport; 36 import org.apache.activemq.transport.tcp.TcpTransport; 37 import org.apache.activemq.util.IOExceptionSupport; 38 import org.apache.activemq.util.ServiceStopper; 39 40 45 public class NIOTransport extends TcpTransport { 46 47 private SocketChannel channel; 49 private SelectorSelection selection; 50 private ByteBuffer inputBuffer; 51 private ByteBuffer currentBuffer; 52 private int nextFrameSize; 53 54 public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException , IOException { 55 super(wireFormat, socketFactory, remoteLocation, localLocation); 56 } 57 58 public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException { 59 super(wireFormat, socket); 60 } 61 62 protected void initializeStreams() throws IOException { 63 channel = socket.getChannel(); 64 channel.configureBlocking(false); 65 66 selection = SelectorManager.getInstance().register(channel, 68 new SelectorManager.Listener() { 69 public void onSelect(SelectorSelection selection) { 70 serviceRead(); 71 } 72 public void onError(SelectorSelection selection, Throwable error) { 73 if( error instanceof IOException ) { 74 onException((IOException ) error); 75 } else { 76 onException(IOExceptionSupport.create(error)); 77 } 78 } 79 }); 80 81 inputBuffer = ByteBuffer.allocate(8*1024); 84 currentBuffer = inputBuffer; 85 nextFrameSize=-1; 86 currentBuffer.limit(4); 87 this.dataOut = new DataOutputStream (new NIOOutputStream(channel, 16*1024)); 88 89 } 90 91 private void serviceRead() { 92 try { 93 while( true ) { 94 95 96 int readSize = channel.read(currentBuffer); 97 if( readSize == -1 ) { 98 onException(new EOFException ()); 99 selection.close(); 100 break; 101 } 102 if( readSize==0 ) { 103 break; 104 } 105 106 if( currentBuffer.hasRemaining() ) 107 continue; 108 109 if( nextFrameSize==-1 ) { 111 assert inputBuffer == currentBuffer; 112 113 inputBuffer.flip(); 116 nextFrameSize = inputBuffer.getInt()+4; 117 if( nextFrameSize > inputBuffer.capacity() ) { 118 currentBuffer = ByteBuffer.allocate(nextFrameSize); 119 currentBuffer.putInt(nextFrameSize); 120 } else { 121 inputBuffer.limit(nextFrameSize); 122 } 123 124 } else { 125 currentBuffer.flip(); 126 127 Object command = wireFormat.unmarshal(new DataInputStream (new NIOInputStream(currentBuffer))); 128 doConsume((Command) command); 129 130 nextFrameSize=-1; 131 inputBuffer.clear(); 132 inputBuffer.limit(4); 133 currentBuffer = inputBuffer; 134 } 135 136 } 137 138 } catch (IOException e) { 139 onException(e); 140 } catch (Throwable e) { 141 onException(IOExceptionSupport.create(e)); 142 } 143 } 144 145 146 protected void doStart() throws Exception { 147 connect(); 148 selection.setInterestOps(SelectionKey.OP_READ); 149 selection.enable(); 150 } 151 152 protected void doStop(ServiceStopper stopper) throws Exception { 153 selection.disable(); 154 super.doStop(stopper); 155 } 156 } 157 | Popular Tags |