KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > net > TCPTransportImpl


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Uri Schneider.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.core.net;
47
48 import java.io.IOException JavaDoc;
49 import java.net.InetSocketAddress JavaDoc;
50 import java.net.SocketAddress JavaDoc;
51 import java.net.SocketException JavaDoc;
52 //import java.net.SocketException;
53
import java.nio.ByteBuffer JavaDoc;
54 import java.nio.channels.SelectableChannel JavaDoc;
55 import java.nio.channels.SocketChannel JavaDoc;
56 import java.util.LinkedList JavaDoc;
57
58 import org.apache.commons.logging.Log;
59 import org.apache.commons.logging.LogFactory;
60 import org.mr.MantaAgent;
61 import org.mr.core.util.byteable.IncomingByteBufferPool;
62
63 /**
64  * TCPTransportImpl.java
65  *
66  *
67  * Created: Wed Feb 04 15:38:19 2004
68  *
69  * @author Uri Schneider
70  * @version 1.0
71  */

72
73 public class TCPTransportImpl implements TransportImpl {
74     class OutItem {
75         OutItem(CNLMessage message, int id) {
76             this.message = message; this.id = id;
77         }
78         CNLMessage message;
79         int id;
80     }
81
82     protected SocketChannel JavaDoc channel;
83     private ByteBuffer JavaDoc lengthBuf;
84     private ByteBuffer JavaDoc messageBuf;
85     private int bytesRead;
86     private int readState;
87     private int channelState;
88     private CNLMessage message;
89     protected Log log;
90     private boolean logExt;
91     private NetworkListener listener;
92
93     private LinkedList JavaDoc outQueue;
94     private CNLMessage outCNL;
95     private ByteBuffer JavaDoc[] outBuffers;
96     private int outIndex;
97     private NetworkSelector selector;
98
99     
100     private static final int READ_STATE_HEADER = 0;
101     private static final int READ_STATE_PAYLOAD = 1;
102
103     private static final int CHANNEL_STATE_DOWN = 0;
104     private static final int CHANNEL_STATE_CONNECTING = 1;
105     private static final int CHANNEL_STATE_CONNECTED = 2;
106     private static final int CHANNEL_STATE_UP = 3;
107
108     public TCPTransportImpl(SocketChannel JavaDoc channel) {
109         this.channel = channel;
110         commonInit();
111     } // TCPTransportImpl constructor
112

113     public TCPTransportImpl(SocketAddress JavaDoc local, SocketAddress JavaDoc remote)
114         throws IOException JavaDoc
115     {
116         this.channel = SocketChannel.open();
117         if(NetworkManager.isTcpNoDelay()){
118             this.channel.socket().setTcpNoDelay(true);
119         }
120         if (local != null) {
121             this.channel.socket().bind(local);
122         }
123         this.channel.configureBlocking(false);
124         try {
125             this.channel.connect(remote);
126         } catch (IOException JavaDoc e) {
127             LogFactory.getLog("TCPTransportImpl").error("Error connecting to " + remote.toString() + ": " + e.toString());
128             this.channelState = CHANNEL_STATE_DOWN;
129             throw e;
130         }
131         commonInit();
132     }
133
134     private void commonInit() {
135         this.lengthBuf = ByteBuffer.allocate(CNLMessage.CNL_HEADERLEN);
136         this.log = LogFactory.getLog("TCPTransportImpl");
137         this.logExt = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getBooleanProperty("log-ext.enable", false);
138
139         this.outQueue = new LinkedList JavaDoc();
140         this.outCNL = null;
141         this.outBuffers = new ByteBuffer JavaDoc[3];
142         this.outBuffers[0] = ByteBuffer.allocate(CNLMessage.CNL_HEADERLEN);
143         this.outIndex = 0;
144         this.selector = null;
145
146         if (this.channel.isConnected()) {
147             this.channelState = CHANNEL_STATE_CONNECTED;
148         } else {
149             this.channelState = CHANNEL_STATE_CONNECTING;
150         }
151         if(NetworkManager.isTcpNoDelay()){
152             try {
153                 this.channel.socket().setTcpNoDelay(true);
154             } catch (SocketException JavaDoc e) {
155                 if(log.isErrorEnabled()){
156                     log.error("Cannot set TCP_NODELAY option on " + toString()+".");
157                 }
158             }
159             
160         }
161         
162         reset();
163     }
164
165     /* (non-Javadoc)
166      * @see org.mr.core.net.TransportImpl#shutdown()
167      */

168     public void shutdown() {
169         try {
170             if(log.isInfoEnabled()){
171                 log.info("SHUTTING DOWN IMPL " + toString()+".");
172             }
173             this.channel.close();
174             this.channelState = CHANNEL_STATE_DOWN;
175         } catch (IOException JavaDoc e) {
176             if(log.isWarnEnabled())
177                 log.warn("Could not close channel: " + e.toString()+".");
178         }
179         synchronized (this.outQueue) {
180             while (!this.outQueue.isEmpty()) {
181                 OutItem item = (OutItem) this.outQueue.removeFirst();
182                 item.message.unuse();
183             }
184             if (this.outCNL != null) {
185                 this.outCNL.unuse();
186             }
187         }
188         if (this.listener != null) {
189             this.listener.implShutdown();
190         }
191     }
192
193     /* (non-Javadoc)
194      * @see org.mr.core.net.TransportImpl#read()
195      */

196     public void read() {
197         int nBytes;
198
199         try {
200             if (this.readState == READ_STATE_HEADER) {
201                 nBytes = this.channel.read(lengthBuf);
202                 if (nBytes == -1) {
203                     if(log.isWarnEnabled()) {
204                         log.warn("Channel " + toString() +
205                                  " EOF. Shutting down.");
206                     }
207                     shutdown();
208                 }
209                 this.bytesRead += nBytes;
210                 if (this.bytesRead == CNLMessage.CNL_HEADERLEN) {
211                     headerComplete();
212                 }
213             } else if (this.readState == READ_STATE_PAYLOAD) {
214                 nBytes = this.channel.read(this.messageBuf);
215                 if (nBytes == -1) {
216                     if(log.isWarnEnabled()) {
217                         log.warn("Channel " + toString() +
218                                  " EOF. Shutting down.");
219                     }
220                     shutdown();
221                 }
222                 if (!this.messageBuf.hasRemaining()) {
223                     messageComplete();
224                 }
225             }
226         } catch (IOException JavaDoc e) {
227             if(log.isWarnEnabled())
228                 log.warn("Error reading from channel (remote = " +
229                          channel.socket().getRemoteSocketAddress().toString() +
230                          "):" + e.getMessage());
231             shutdown();
232         }
233         this.listener.activityDetected();
234     }
235
236     /* (non-Javadoc)
237      * @see org.mr.core.net.TransportImpl#write(java.nio.ByteBuffer)
238      */

239     public void write(CNLMessage msg, int id, NetworkSelector selector) {
240         if(log.isTraceEnabled() && logExt){
241             log.trace("Sending message(" + id + ") to " + toString()+".");
242         }
243         msg.use();
244         if (this.selector == null) {
245             this.selector = selector;
246         }
247         synchronized (this.outQueue) {
248             OutItem item = new OutItem(msg, id);
249             this.outQueue.addLast(item);
250             if (this.outCNL == null) {
251                 prepareOutgoingMessage();
252                 this.selector.addImplForWrite(this);
253             }
254         }
255     }
256
257     private void prepareOutgoingMessage() {
258         OutItem item = (OutItem) this.outQueue.removeFirst();
259         ByteBuffer JavaDoc[] payloadBuffers = item.message.valueAsBuffers();
260
261         // payloadBuffers may contain 1 or 2 buffers, depending on
262
// whether it's a network message or a manta message. hence
263
// the ugliness. see also selectWrite().
264
outgoingMessageHook(item.message);
265         this.outCNL = item.message;
266         this.outBuffers[0].clear();
267         this.outBuffers[2] = null;
268         this.outCNL.headerToBuffer(this.outBuffers[0], item.id);
269         for (int i = 0; i < payloadBuffers.length; i++) {
270             this.outBuffers[i+1] = payloadBuffers[i];
271         }
272         this.outIndex = 0;
273     }
274
275     /**
276      * This method enables subclasses (e.g. MWBTransportImpl) to
277      * manipulate an outgoing message before it is being processed.
278      */

279     protected void outgoingMessageHook(CNLMessage message) {}
280
281     public void selectWrite() {
282         try {
283             while (true) {
284                 while (this.outIndex < this.outBuffers.length) {
285                     ByteBuffer JavaDoc buf = this.outBuffers[this.outIndex];
286                     if (buf == null) {
287                         break;
288                     }
289                     int nBytes = this.channel.write(buf);
290                     this.listener.activityDetected();
291                     if (buf.remaining() > 0) {
292                         // write would block - so get out
293
return;
294                     }
295                     this.outIndex++;
296                 }
297                 this.outCNL.setSent();
298                 this.outCNL.unuse();
299                 synchronized (this.outQueue) {
300                     if (this.outQueue.isEmpty()) {
301                         // no more messages - so get out
302
this.selector.removeImplForWrite(this);
303                         this.outCNL = null;
304                         return;
305                     } else {
306                         prepareOutgoingMessage();
307                     }
308                 }
309             }
310         } catch (IOException JavaDoc e) {
311             if(log.isErrorEnabled())
312                 log.error("Error writing to " + toString() + ": " +
313                              e.toString()+".");
314             shutdown();
315         }
316     }
317
318     /* (non-Javadoc)
319      * @see org.mr.core.net.TransportImpl#isInitialized()
320      */

321     public boolean isInitialized() {
322         return this.channelState == CHANNEL_STATE_UP;
323     }
324
325     /* (non-Javadoc)
326      * @see org.mr.core.net.TransportImpl#setInitialized()
327      */

328     public void setInitialized() {
329         this.channelState = CHANNEL_STATE_UP;
330     }
331
332     /* (non-Javadoc)
333      * @see org.mr.core.net.TransportImpl#isDown()
334      */

335     public boolean isDown() {
336         return this.channelState == CHANNEL_STATE_DOWN;
337     }
338
339     /* (non-Javadoc)
340      * @see org.mr.core.net.TransportImpl#getChannel()
341      */

342     public SelectableChannel JavaDoc getChannel() {
343         return this.channel;
344     }
345
346     /* (non-Javadoc)
347      * @see org.mr.core.net.TransportImpl#getType()
348      */

349     public TransportType getType() {
350         return TransportType.TCP;
351     }//TransportType
352

353     private void headerComplete() {
354         this.lengthBuf.flip();
355         this.message.readHeader(this.lengthBuf);
356         this.messageBuf =
357             IncomingByteBufferPool.getInstance().getBuffer(this.message.getLength());
358         this.messageBuf.limit(this.message.getLength());
359         this.readState = READ_STATE_PAYLOAD;
360     }
361
362     private void messageComplete() {
363         this.messageBuf.flip();
364         this.message.setBuffer(this.messageBuf);
365
366         if(log.isTraceEnabled() && logExt){
367             log.trace("Received message(" + this.message.getID() +
368                       ") from " + toString()+".");
369         }
370
371         this.message.setSourceAddress(this.channel.socket().getRemoteSocketAddress());
372         this.message.setDestAddress(this.channel.socket().getLocalSocketAddress());
373         this.listener.messageReady(this.message);
374
375         reset();
376     }
377
378     private void reset() {
379         this.bytesRead = 0;
380         this.readState = READ_STATE_HEADER;
381         this.message = new CNLMessage(true);
382         this.lengthBuf.clear();
383     }
384
385     /* (non-Javadoc)
386      * @see org.mr.core.net.TransportImpl#isConnected()
387      */

388     public boolean isConnected() {
389         return this.channel != null && this.channel.isConnected();
390     }
391
392     /* (non-Javadoc)
393      * @see org.mr.core.net.TransportImpl#setListener(org.mr.core.net.NetworkListener)
394      */

395     public void setListener(NetworkListener listener) {
396         this.listener = listener;
397     }
398
399     /* (non-Javadoc)
400      * @see java.lang.Object#toString()
401      */

402     public String JavaDoc toString() {
403         StringBuffer JavaDoc buf = new StringBuffer JavaDoc();
404         try {
405             buf.append(this.channel.socket().getLocalSocketAddress().
406                        toString());
407             buf.append(this.channel.socket().getRemoteSocketAddress().
408                        toString());
409         } catch (Throwable JavaDoc t) {
410             buf.append("/unknown/unknown");
411         }
412         buf.append("@TCP");
413
414         return buf.toString();
415     }
416     
417     public void onConnect() {}
418
419     public InetSocketAddress JavaDoc getLocalSocketAddress() {
420         return (InetSocketAddress JavaDoc)
421             this.channel.socket().getLocalSocketAddress();
422     }
423
424     public InetSocketAddress JavaDoc getRemoteSocketAddress() {
425         return (InetSocketAddress JavaDoc)
426             this.channel.socket().getRemoteSocketAddress();
427     }
428 } // TCPTransportImpl
429
Popular Tags