KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > nio > NIOTransport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.nio;
19
20 import java.io.DataInputStream JavaDoc;
21 import java.io.DataOutputStream JavaDoc;
22 import java.io.EOFException JavaDoc;
23 import java.io.IOException JavaDoc;
24 import java.net.Socket JavaDoc;
25 import java.net.URI JavaDoc;
26 import java.net.UnknownHostException JavaDoc;
27 import java.nio.ByteBuffer JavaDoc;
28 import java.nio.channels.SelectionKey JavaDoc;
29 import java.nio.channels.SocketChannel JavaDoc;
30
31 import javax.net.SocketFactory;
32
33 import org.apache.activemq.wireformat.WireFormat;
34 import org.apache.activemq.command.Command;
35 import org.apache.activemq.transport.Transport;
36 import org.apache.activemq.transport.tcp.TcpTransport;
37 import org.apache.activemq.util.IOExceptionSupport;
38 import org.apache.activemq.util.ServiceStopper;
39
40 /**
41  * An implementation of the {@link Transport} interface using raw tcp/ip
42  *
43  * @version $Revision$
44  */

45 public class NIOTransport extends TcpTransport {
46
47     //private static final Log log = LogFactory.getLog(NIOTransport.class);
48
private SocketChannel JavaDoc channel;
49     private SelectorSelection selection;
50     private ByteBuffer JavaDoc inputBuffer;
51     private ByteBuffer JavaDoc currentBuffer;
52     private int nextFrameSize;
53
54     public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI JavaDoc remoteLocation, URI JavaDoc localLocation) throws UnknownHostException JavaDoc, IOException JavaDoc {
55         super(wireFormat, socketFactory, remoteLocation, localLocation);
56     }
57
58     public NIOTransport(WireFormat wireFormat, Socket JavaDoc socket) throws IOException JavaDoc {
59         super(wireFormat, socket);
60     }
61
62     protected void initializeStreams() throws IOException JavaDoc {
63         channel = socket.getChannel();
64         channel.configureBlocking(false);
65         
66         // listen for events telling us when the socket is readable.
67
selection = SelectorManager.getInstance().register(channel,
68                 new SelectorManager.Listener() {
69                     public void onSelect(SelectorSelection selection) {
70                         serviceRead();
71                     }
72                     public void onError(SelectorSelection selection, Throwable JavaDoc error) {
73                         if( error instanceof IOException JavaDoc ) {
74                             onException((IOException JavaDoc) error);
75                         } else {
76                             onException(IOExceptionSupport.create(error));
77                         }
78                     }
79                 });
80         
81         // Send the data via the channel
82
// inputBuffer = ByteBuffer.allocateDirect(8*1024);
83
inputBuffer = ByteBuffer.allocate(8*1024);
84         currentBuffer = inputBuffer;
85         nextFrameSize=-1;
86         currentBuffer.limit(4);
87         this.dataOut = new DataOutputStream JavaDoc(new NIOOutputStream(channel, 16*1024));
88         
89     }
90     
91     private void serviceRead() {
92         try {
93             while( true ) {
94                 
95     
96                 int readSize = channel.read(currentBuffer);
97                 if( readSize == -1 ) {
98                     onException(new EOFException JavaDoc());
99                     selection.close();
100                     break;
101                 }
102                 if( readSize==0 ) {
103                     break;
104                 }
105                 
106                 if( currentBuffer.hasRemaining() )
107                     continue;
108
109                 // Are we trying to figure out the size of the next frame?
110
if( nextFrameSize==-1 ) {
111                     assert inputBuffer == currentBuffer;
112
113                     // If the frame is too big to fit in our direct byte buffer,
114
// Then allocate a non direct byte buffer of the right size for it.
115
inputBuffer.flip();
116                     nextFrameSize = inputBuffer.getInt()+4;
117                     if( nextFrameSize > inputBuffer.capacity() ) {
118                         currentBuffer = ByteBuffer.allocate(nextFrameSize);
119                         currentBuffer.putInt(nextFrameSize);
120                     } else {
121                         inputBuffer.limit(nextFrameSize);
122                     }
123                     
124                 } else {
125                     currentBuffer.flip();
126                     
127                     Object JavaDoc command = wireFormat.unmarshal(new DataInputStream JavaDoc(new NIOInputStream(currentBuffer)));
128                     doConsume((Command) command);
129                     
130                     nextFrameSize=-1;
131                     inputBuffer.clear();
132                     inputBuffer.limit(4);
133                     currentBuffer = inputBuffer;
134                 }
135                 
136             }
137             
138         } catch (IOException JavaDoc e) {
139             onException(e);
140         } catch (Throwable JavaDoc e) {
141             onException(IOExceptionSupport.create(e));
142         }
143     }
144
145
146     protected void doStart() throws Exception JavaDoc {
147         connect();
148         selection.setInterestOps(SelectionKey.OP_READ);
149         selection.enable();
150     }
151
152     protected void doStop(ServiceStopper stopper) throws Exception JavaDoc {
153         selection.disable();
154         super.doStop(stopper);
155     }
156 }
157
Popular Tags