KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > net > Transport


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Uri Schneider (urish31 at users.sourceforge.net).
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.core.net;
47
48 /**
49  * Transport.java
50  *
51  * An object encapsulating a connection to a remote agent, using a
52  * specific protocol. It uses {@link TransportImpl} objects for the
53  * actual I/O operations.
54  *
55  * Created: Mon Jan 19 20:07:35 2004
56  *
57  * @author Uri Schneider (urish31 at users.sourceforge.net)
58  * @version 1.0
59  */

60
61 import java.io.IOException JavaDoc;
62 import java.io.UnsupportedEncodingException JavaDoc;
63 import java.net.InetAddress JavaDoc;
64 import java.net.InetSocketAddress JavaDoc;
65 import java.nio.ByteBuffer JavaDoc;
66 import java.nio.channels.SelectableChannel JavaDoc;
67 import java.nio.channels.SocketChannel JavaDoc;
68
69 import java.util.Iterator JavaDoc;
70 import java.util.LinkedList JavaDoc;
71 import java.util.List JavaDoc;
72
73 import org.mr.MantaAgent;
74 import org.mr.core.configuration.ConfigManager;
75 import org.apache.commons.logging.Log;
76 import org.apache.commons.logging.LogFactory;
77 import org.mr.core.net.messages.NetworkMessage;
78 import org.mr.core.net.messages.NetworkMessageID;
79 import org.mr.core.net.messages.NetworkMessageKeepalive;
80 import org.mr.core.stats.StatManager;
81 import org.mr.core.stats.TemporalCounter;
82 import org.mr.core.util.SystemTime;
83 import org.mr.core.util.byteable.IncomingByteBufferPool;
84
85 public class Transport implements NetworkListener {
86     class BacklogItem {
87         public CNLMessage cnlMessage;
88
89         public BacklogItem(CNLMessage cnlMessage) {
90             this.cnlMessage = cnlMessage;
91         }
92     }
93
94     private TransportInfo info;
95 // private HashMap sendImpls;
96
private TransportImpl theImpl;
97 // private ArrayList receiveImpls;
98
private Log log;
99     private LinkedList JavaDoc backlog;
100     private String JavaDoc myAgentName;
101     private String JavaDoc remoteAgentName;
102 // private NetworkManager manager;
103
private NetworkListener listener;
104     private StatManager statManager;
105     private long totalBytes;
106     private long totalMessages;
107     private TemporalCounter fiveMinBytes;
108     private TemporalCounter fiveMinMessages;
109     private int lastIdSent;
110     private NetworkSelector selector;
111     private boolean sentInit;
112     private boolean passive;
113
114     // keepalive stuff
115
private static final long MAX_CONNECT_INTERVAL = 300000; // millis
116
private static final long MIN_CONNECT_INTERVAL = 3000; // millis
117
private static final double CONNECT_INTERVAL_FACTOR = 1.2;
118
119     private boolean debugKeepalive;
120     protected TransportStateListener stateListener;
121     private long lastSent; // millis
122
private long lastReceived; // millis
123
private long lastMantaMessage; // millis
124
private long lastConnect; // millis
125
private long connectInterval; // millis
126
private int keepaliveInterval; // secs
127
private int deadFactor; // multiplier of
128
// keepaliveInterval
129
private short connectionTTL; // minutes
130
private boolean isTTLExpired;
131
132     /**
133      * Create a new Transport object. At creation, the object will not have
134      * any TransportImpl objects. They are created as needed, or attached to
135      * the Transport when incoming new channels are accepted.
136      * @param info - describing the Transport's connection parameters
137      * @param localAddresses - a (possibly null) list of local addresses.
138      * a list with more than one address will cause the Transport to be a
139      * multi-send transport.
140      * @param myAgentName - this agent's name. used when sending an ID message
141      * to the other side.
142      * @param listener - to report incoming messages
143      * @param statManager - to report byte and message count
144      * @param selector - for asynchronous write
145      * @param passive - this means that the transport is only used as
146      * a placeholder. It will never actively connect to the remote
147      * address. This is used by the IRS.
148      */

149     public Transport(TransportInfo info, String JavaDoc myAgentName,
150                      String JavaDoc remoteAgentName, NetworkListener listener,
151                      StatManager statManager,
152                      NetworkSelector selector, boolean passive) {
153         this.info = info;
154         this.log = LogFactory.getLog("Transport");
155         this.backlog = new LinkedList JavaDoc();
156         this.myAgentName = myAgentName;
157         this.remoteAgentName = remoteAgentName;
158         this.listener = listener;
159         this.statManager = statManager;
160         this.totalMessages = 0;
161         this.totalBytes = 0;
162         this.fiveMinMessages = new TemporalCounter(5000, 60);
163         this.fiveMinBytes = new TemporalCounter(5000, 60);
164         this.selector = selector;
165
166         this.lastIdSent = 0;
167
168         this.theImpl = null;
169         this.sentInit = false;
170         this.passive = passive;
171         // keepalive stuff
172
this.stateListener = null;
173         this.lastSent = -1;
174         this.lastReceived = -1;
175         this.lastMantaMessage = SystemTime.currentTimeMillis();
176         this.lastConnect = -1;
177         this.connectInterval = MIN_CONNECT_INTERVAL;
178         ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager();
179         this.deadFactor = config.getIntProperty("network.keepalive.deadfactor", 20);
180         this.keepaliveInterval = config.getIntProperty("network.keepalive.interval", 1);
181         this.debugKeepalive = config.getBooleanProperty("network.keepalive.debug", false);
182
183         // configuration item keepalive.connectionTTL is specified in minutes:
184
this.connectionTTL =
185             config.getShortProperty("network.keepalive.connectionTTL", (short) 5);
186         this.isTTLExpired = false;
187
188     } // Transport constructor
189

190     /**
191      * Get the Info value.
192      * @return the Info value.
193      */

194     public TransportInfo getInfo() {
195         return info;
196     }
197
198     /**
199      * Set the Info value.
200      * @param newInfo The new Info value.
201      */

202     public void setInfo(TransportInfo newInfo) {
203         this.info = newInfo;
204     }
205
206     /**
207      * create and connect this Transport's impl objects.
208      * @throws IOException
209      */

210     public void createImpls() throws IOException JavaDoc {
211         boolean finishConnecting = false;
212         try {
213             synchronized(this) {
214                 long now = SystemTime.currentTimeMillis();
215                 this.lastConnect = now;
216                 this.lastMantaMessage = now;
217                 this.isTTLExpired = false;
218
219                 if (theImpl == null || theImpl.isDown()) {
220 // InetAddress local = getValidLocalAddress();
221
theImpl = TransportProvider.
222                         createImpl(this.info.getTransportInfoType(),
223                                    null,
224                                    this.info.getSocketAddress(),
225                                    this.remoteAgentName, this);
226
227                     if (this.theImpl != null) {
228                         theImpl.setListener(this);
229                         if (theImpl.isConnected()) {
230                             finishConnecting = true;
231                         } else {
232                             this.selector.addTransportImpl(theImpl, this);
233                         }
234                     }
235                 }
236             }
237         } catch (IOException JavaDoc e) {
238             if (this.stateListener != null) {
239                 this.stateListener.transportDown(this);
240             }
241             throw e;
242         }
243         if (finishConnecting) {
244             finishedConnecting(this.theImpl);
245         }
246     }
247
248     /**
249      * try to call createImpls(), only enough time has passed since
250      * the last connect.
251      */

252     public void createImplsMaybe() {
253         long now = SystemTime.currentTimeMillis();
254         boolean tryCreate = false;
255         synchronized (this) {
256             if ((now - lastConnect) >= connectInterval) {
257                 tryCreate = true;
258                 connectInterval = (long)
259                     (CONNECT_INTERVAL_FACTOR * connectInterval);
260                 if (connectInterval > MAX_CONNECT_INTERVAL) {
261                     connectInterval = MAX_CONNECT_INTERVAL;
262                 }
263             }
264         }
265         if (tryCreate) {
266             try {
267                 createImpls();
268             } catch (IOException JavaDoc e) {}
269         }
270     }
271
272     private void sendInit(TransportImpl impl) {
273         try {
274             sendNetworkMessage(new NetworkMessageID(!this.sentInit,
275                                                     myAgentName),
276                                impl);
277             this.sentInit = true;
278         } catch (UnsupportedEncodingException JavaDoc e) {}
279     }
280
281     private void sendKeepalive(long now) {
282         this.lastSent = now;
283         NetworkMessageKeepalive ka =
284             new NetworkMessageKeepalive(this.keepaliveInterval,
285                                         this.connectionTTL);
286         if (this.theImpl != null && this.theImpl.isConnected()) {
287             sendNetworkMessage(ka, this.theImpl);
288         }
289         if (debugKeepalive){
290             log.info("Keep Alive lastSent = " + this.lastSent);
291         }
292     }
293
294     /**
295      * Whether this transport has a connected channel
296      *
297      * @return a <code>boolean</code> value
298      */

299     public synchronized boolean isConnected() {
300         return this.theImpl != null && this.theImpl.isConnected();
301     }
302
303     /**
304      * Whether this transport is initialized. Only initialized
305      * transports can send applicative messages.
306      *
307      * @return a <code>boolean</code> value
308      */

309     public synchronized boolean isInitialized() {
310         return this.theImpl != null && this.theImpl.isInitialized();
311     }
312
313     public synchronized boolean isDown() {
314         return this.theImpl == null || this.theImpl.isDown();
315     }
316
317     public void setInitialized(InetAddress JavaDoc local, boolean initId)
318     {
319         boolean wasInitialized = isInitialized();
320         synchronized(this) {
321             
322             if (this.theImpl != null) {
323                 this.theImpl.setInitialized();
324                 sendBacklog();
325             }
326             this.connectInterval = MIN_CONNECT_INTERVAL;
327         }
328         if (this.stateListener != null && !wasInitialized) {
329             this.stateListener.transportUp(this);
330         }
331     }
332
333     public void finishedConnecting(SocketChannel JavaDoc channel) {
334         boolean notifyListener = false;
335         synchronized(this) {
336             SelectableChannel JavaDoc existing = this.theImpl.getChannel();
337             if (existing == channel) {
338                 if (this.theImpl.isConnected()) {
339                     // register the impl again, this time for read
340
this.selector.addTransportImpl(this.theImpl, this);
341                     this.theImpl.onConnect();
342                     sendInit(this.theImpl);
343                     // CDP impls are immediatelay initialized - so check
344
// if (impl.isInitialized() && this.stateListener != null) {
345
// notifyListener = true;
346
// this.stateListener.transportUp(this);
347
// }
348
} else {
349                     this.theImpl.shutdown();
350                     if (isDown()) {
351                         if(log.isWarnEnabled()){
352                             log.warn("Transport " + getInfo().toString() +
353                                      " can't connect. Clearing backlog (" +
354                                      this.backlog.size() +
355                                      " messages deleted)");
356                         }
357                         clearBacklog();
358                         if (this.stateListener != null) {
359                             notifyListener = true;
360                         }
361                     }
362                 }
363             }
364         }
365         if (notifyListener) {
366             this.stateListener.transportDown(this);
367         }
368     }
369
370     void finishedConnecting(TransportImpl impl) {
371         boolean notifyListener = false;
372         synchronized(this) {
373             if (impl.isConnected()) {
374                 // register the impl again, this time for read
375
this.selector.addTransportImpl(impl, this);
376                 impl.onConnect();
377                 sendInit(impl);
378                 // CDP impls are immediatelay initialized - so check
379
// if (impl.isInitialized() && this.stateListener != null) {
380
// notifyListener = true;
381
// this.stateListener.transportUp(this);
382
// }
383
} else {
384                 impl.shutdown();
385                 if (isDown()) {
386                     if(log.isWarnEnabled()){
387                         log.warn("Transport " + getInfo().toString() +
388                                  " can't connect. Clearing backlog (" +
389                                  this.backlog.size() + " messages deleted)");
390                     }
391                     clearBacklog();
392                     if (this.stateListener != null) {
393                         notifyListener = true;
394                     }
395                 }
396             }
397         }
398         if (notifyListener) {
399             this.stateListener.transportDown(this);
400         }
401     }
402
403     private void clearBacklog() {
404         synchronized (this.backlog) {
405             Iterator JavaDoc i = this.backlog.iterator();
406             while (i.hasNext()) {
407                 BacklogItem item = (BacklogItem) i.next();
408                 item.cnlMessage.unuse();
409             }
410             this.backlog.clear();
411         }
412     }
413
414     private void sendBacklog() {
415         synchronized (this.backlog) {
416             while (!this.backlog.isEmpty()) {
417                 BacklogItem b = (BacklogItem) this.backlog.removeFirst();
418                 realSendMantaMessage(b.cnlMessage);
419             }
420         }
421     }
422
423     private void sendBuffer(CNLMessage message, int id, TransportImpl impl)
424         throws IOException JavaDoc
425     {
426         impl.write(message, id, this.selector);
427     }
428
429     private int sendBuffer(CNLMessage msg, int id)
430         throws IOException JavaDoc
431     {
432         if (this.theImpl != null && this.theImpl.isInitialized()) {
433             try {
434                 sendBuffer(msg, id, this.theImpl);
435             } catch (IOException JavaDoc e) {
436                 this.theImpl.shutdown();
437                 if (this.stateListener != null) {
438                     this.stateListener.transportDown(this);
439                 }
440                 throw e;
441             }
442
443             this.lastSent = SystemTime.currentTimeMillis();
444
445             if (debugKeepalive)
446                 log.info("Keep Alive lastSent = " + this.lastSent);
447
448             return 1;
449         } else
450             return 0;
451     }
452
453     /**
454      * shutdown all TransportImpl objects.
455      */

456     public void shutdown() {
457         if (this.theImpl != null) {
458             this.theImpl.shutdown();
459         }
460     }
461
462     /**
463      * Send a manta (applicative) message through this transport. This
464      * method will report send errors using the provided listener. If this
465      * Transport has no initialized impls, the message will be saved and sent
466      * whenever the impls are available.
467      * @param cnlMessage
468      * @param message
469      * @param addresses
470      * @param listener
471      */

472     public void sendMantaMessage(CNLMessage cnlMessage) {
473         cnlMessage.use();
474         if (!isInitialized()) {
475             synchronized (this.backlog) {
476                 this.backlog.add(new BacklogItem(cnlMessage));
477             }
478         } else {
479             realSendMantaMessage(cnlMessage);
480         }
481     }
482
483     private void realSendMantaMessage(CNLMessage cnlMessage) {
484         try {
485             int messageSize;
486             synchronized (this) {
487                 sendBuffer(cnlMessage, getSendId());
488                 messageSize = cnlMessage.getTotalLength();
489                 this.totalMessages++;
490                 this.totalBytes += messageSize;
491                 this.fiveMinMessages.addSample(1);
492                 this.fiveMinBytes.addSample(messageSize);
493                 this.lastMantaMessage = SystemTime.currentTimeMillis();
494             }
495             this.statManager.addMessageSample(messageSize);
496         } catch (IOException JavaDoc e) {
497             if(log.isErrorEnabled()){
498                 log.error("Cannot write Manta message to " + this.info.getSocketAddress().toString() + " (" + e.toString() + ")");
499             }
500         }
501
502         cnlMessage.unuse();
503     }
504
505     private void sendNetworkMessage(NetworkMessage msg, TransportImpl impl) {
506         int len = NetworkMessage.NET_HEADERLEN + msg.getLength();
507         ByteBuffer JavaDoc buf = ByteBuffer.allocate(len);
508         buf.limit(len);
509         msg.write(buf);
510         ByteBuffer JavaDoc[] buffers = new ByteBuffer JavaDoc[1];
511         buffers[0] = buf;
512         CNLMessage cnlMsg = new CNLMessage(CNLMessage.CNL_NETWORK, buffers);
513
514         try {
515             cnlMsg.use();
516             sendBuffer(cnlMsg, getSendId(), impl);
517             cnlMsg.unuse();
518         } catch (IOException JavaDoc e) {
519             if(log.isErrorEnabled())
520                 log.error("Cannot write network message to " +
521                              this.info.getSocketAddress().toString());
522         }
523     }
524
525     private synchronized int getSendId() {
526         if (this.lastIdSent == Integer.MAX_VALUE) {
527             this.lastIdSent = 0;
528         }
529
530         this.lastIdSent++;
531
532         return this.lastIdSent;
533     }
534
535     /**
536      * Incorporate the new impl into this transport. The new impl can be
537      * primary, i.e. used for both sending and receiving. In this case, a
538      * check will be made to determine if an existing impl already exists.
539      * The impl can be secondary, i.e. used only for receiving (the other side
540      * is a multi-send Transport, and only one of its impls is used by this
541      * Transport to send messages), in which case the impl will be added to
542      * the receiveImpls table.
543      * @param newImpl
544      */

545     public void mergeImpl(TransportImpl newImpl, boolean initId) {
546         boolean notifyListener = false;
547         synchronized(this) {
548             this.lastMantaMessage = SystemTime.currentTimeMillis();
549             this.lastReceived = this.lastMantaMessage;
550             newImpl.setListener(this);
551             TransportImpl oldImpl = this.theImpl;
552             if(log.isDebugEnabled()){
553                 log.debug("mergeImpl(): old = " +
554                           (oldImpl == null ? "null" : oldImpl.toString()) +
555                           "; new = " + newImpl.toString());
556             }
557             if (oldImpl == null || !oldImpl.isConnected()) {
558                 this.theImpl = newImpl;
559                 sendInit(newImpl);
560                 newImpl.setInitialized();
561                 if (this.stateListener != null) {
562                     notifyListener = true;
563                 }
564             } else {
565                 String JavaDoc srcOld = oldImpl.getLocalSocketAddress().toString();
566                 String JavaDoc dstNew = newImpl.getRemoteSocketAddress().toString();
567                 if (srcOld.compareTo(dstNew) > 0) {
568                     // choose old
569
if(log.isDebugEnabled()){
570                         log.debug("mergeImpl(): shut down new impl.");
571                     }
572                     newImpl.shutdown();
573                 } else {
574                     // choose new
575
oldImpl.shutdown();
576                     this.theImpl = newImpl;
577                     sendInit(newImpl);
578                     newImpl.setInitialized();
579                     if (this.stateListener != null) {
580                         notifyListener = true;
581                     }
582                     if(log.isDebugEnabled()){
583                         log.debug("mergeImpl(): shut down old impl.");
584                     }
585                 }
586             }
587             sendBacklog();
588         }
589         if (notifyListener) {
590             this.stateListener.transportUp(this);
591         }
592     }
593
594
595     /* (non-Javadoc)
596      * @see org.mr.core.net.NetworkListener#acceptedChannel(java.nio.channels.SocketChannel)
597      */

598     public void acceptedChannel(SocketChannel JavaDoc channel) {
599         if(log.isErrorEnabled()){
600             log.error("acceptedChannel(channel) is not implemented by Transport");
601         }
602     }
603
604     public void acceptedImpl(TransportImpl impl) {
605         if(log.isErrorEnabled()){
606             log.error("acceptedImpl(impl) is not implemented by Transport");
607         }
608     }
609
610     /* (non-Javadoc)
611      * @see org.mr.core.net.NetworkListener#messageReady(java.nio.channels.SocketChannel, org.mr.core.net.CNLMessage)
612      */

613     public void messageReady(CNLMessage message) {
614         boolean sendToListener = false;
615         synchronized(this) {
616             int id = message.getID();
617             // Amir S: i have added the id < 2 after noticing that there
618
// are cases where the agent is down and up again and then the
619
// id is 2 and is treated as "OLD" discarding duplicate
620
// if (id > this.lastIdReceived ||
621
// (this.lastIdReceived == Integer.MAX_VALUE && id == 1)
622
// || id <=2) {
623
if (!checkForKeepalive(message)) {
624                     this.lastMantaMessage = SystemTime.currentTimeMillis();
625 // this.listener.messageReady(message);
626
sendToListener = true;
627                 } else {
628                     IncomingByteBufferPool.getInstance().
629                         release(message.buffers()[0]);
630                 }
631 // } else {
632
// IncomingByteBufferPool.getInstance().
633
// release(message.buffers()[0]);
634
// if(log.isWarnEnabled()) {
635
// log.warn("Discarding duplicate message from " +
636
// message.getSourceAddress() + " (id = " + id
637
// + ")");
638
// }
639
// }
640

641             this.lastReceived = SystemTime.currentTimeMillis();
642         }
643         if (sendToListener) {
644             this.listener.messageReady(message);
645         }
646     }
647
648     private boolean checkForKeepalive(CNLMessage message) {
649         if (message.getType() != CNLMessage.CNL_NETWORK) {
650             return false;
651         }
652         ByteBuffer JavaDoc buf = message.valueAsBuffers()[0];
653         try {
654             NetworkMessageKeepalive keepalive = (NetworkMessageKeepalive)
655                 NetworkMessage.create(buf, false, null, null);
656             if (keepalive == null) {
657                 return false;
658             }
659             if (debugKeepalive)
660                 log.info("Keep Alive[" + this.info.toString() +
661                          "] received keep alive");
662             if (keepalive.getInterval() < this.keepaliveInterval) {
663                 this.keepaliveInterval = keepalive.getInterval();
664             }
665             if (keepalive.getConnectionTTL() > this.connectionTTL) {
666                 this.connectionTTL = keepalive.getConnectionTTL();
667             }
668
669             long now = SystemTime.currentTimeMillis();
670             if ((now - this.lastSent) >= (this.keepaliveInterval * 1000)) {
671                 if (debugKeepalive)
672                     log.info("Keep Alive[" + this.info.toString() +
673                              "] sending response keep alive (now = " + now +
674                              "; lastS = " + lastSent + ")");
675                 sendKeepalive(now);
676             }
677         } catch (ClassCastException JavaDoc e) {
678             return false;
679         }
680         return true;
681     }
682     /*
683     private InetAddress getValidLocalAddress() {
684         try {
685             Enumeration ifs = NetworkInterface.getNetworkInterfaces();
686             while (ifs.hasMoreElements()) {
687                 NetworkInterface iface = (NetworkInterface) ifs.nextElement();
688                 Enumeration ips = iface.getInetAddresses();
689                 while (ips.hasMoreElements()) {
690                     InetAddress ip = (InetAddress) ips.nextElement();
691                     if (!ip.getHostAddress().equals("127.0.0.1")) {
692                         return ip;
693                     }
694                 }
695             }
696         } catch (Throwable t) {}
697
698         if(log.isInfoEnabled()){
699             log.info("getValidLocalAddress(): couldn't find any, " +
700                         "returning 0.0.0.0");
701         }
702         try {
703             return InetAddress.getByName("0.0.0.0");
704         } catch (UnknownHostException e) {}
705         
706         return null; // won't happen, trust me.
707     }
708     */

709     public synchronized List JavaDoc getConnectedImpls() {
710         LinkedList JavaDoc result = new LinkedList JavaDoc();
711
712         if (this.theImpl != null && this.theImpl.isConnected()) {
713             result.add(this.theImpl);
714         }
715         return result;
716     }
717
718     public long getTotalMessages() {
719         return this.totalMessages;
720     }
721
722     public long getTotalBytes() {
723         return this.totalBytes;
724     }
725
726     public long getFiveMinMessages() {
727         return this.fiveMinMessages.getValue();
728     }
729
730     public long getFiveMinBytes() {
731         return this.fiveMinBytes.getValue();
732     }
733
734     public void setStateListener(TransportStateListener listener) {
735         this.stateListener = listener;
736     }
737
738     /**
739      * If this transport's remote agent is being monitored, this
740      * method is called by the {@link AgentMonitorManager}
741      * periodically.<p> It checks for incoming traffic through this
742      * transport, and {@link #shutdown shuts down} the transport if no
743      * incoming messages has been received for a (configurable) while.
744      * It also sends a keepalive message if no ougoing messages were
745      * sent for a (configurable) while.
746      */

747     public void keepalive() {
748         boolean shouldShutdown = false;
749         synchronized(this) {
750             long now = SystemTime.currentTimeMillis();
751             if (this.lastReceived == -1) {
752                 this.lastReceived = now;
753             }
754
755             if (isInitialized()) {
756                 if (debugKeepalive)
757                     log.info("Keep Alive[" + this.info.toString() +
758                              "] sending timer keep alive (now = " + now +
759                              "; lastS = " + lastSent + ")");
760                 sendKeepalive(now);
761
762                 long deadInterval =
763                     (long) this.keepaliveInterval * this.deadFactor * 1000;
764                 // no overflow for you!
765
if (deadInterval < 0) {
766                     deadInterval = Long.MAX_VALUE;
767                 }
768                 if ((now - this.lastReceived) >= deadInterval) {
769                     if (log.isInfoEnabled())
770                         log.info("Keep Alive[" + this.info.toString() +
771                                  "] disconnect (now = " + now +
772                                  "; lastR = " + this.lastReceived + ")");
773                     shouldShutdown = true;
774 // shutdown();
775
}
776                 if ((now - this.lastMantaMessage) >=
777                     (this.connectionTTL * 60000 + 60000)) {
778                     if(log.isInfoEnabled()){
779                         log.info("Keep Alive[" + this.info.toString() +
780                                  "] connection TTL expired (now = " + now +
781                                  "; lastManta = " + this.lastMantaMessage +
782                                  ")");
783                     }
784                     this.isTTLExpired = true;
785                     shouldShutdown = true;
786 // shutdown();
787
}
788             } else if (isDown() && !isTTLExpired() && !isPassive()) {
789 // if ((now - this.lastMantaMessage) >= (this.connectionTTL * 60000) &&
790
// this.connectInterval == MIN_CONNECT_INTERVAL) {
791
// if (this.log.isInfoEnabled()) {
792
// this.log.info("Last disconnect was due to TTL. " +
793
// "Nothing to worry about.");
794
// }
795
// this.isTTLExpired = true;
796
// System.out.println("keepalive: ttl passive = " +
797
// this.isTTLExpired);
798
// } else
799
createImplsMaybe();
800 // if ((now - lastConnect) >= connectInterval) {
801
// try {
802
// createImpls();
803
// } catch (IOException e) {}
804
// if(this.debugKeepalive)
805
// log.info("Keep Alive[" + this.info.toString() +
806
// "] try connect (next not before " +
807
// connectInterval + ")");
808
// connectInterval = (long)
809
// (CONNECT_INTERVAL_FACTOR * connectInterval);
810
// if (connectInterval > MAX_CONNECT_INTERVAL) {
811
// connectInterval = MAX_CONNECT_INTERVAL;
812
// }
813
// }
814
}
815         }
816         if (shouldShutdown) {
817             shutdown();
818         }
819 // if (notifyListener) {
820
// this.stateListener.transportDown(this);
821
// }
822
}
823
824     public boolean isIndirect() {
825         return false;
826     }
827
828     public boolean isPassive() {
829         return this.passive;
830     }
831
832     private boolean isTTLExpired() {
833         return this.isTTLExpired;
834     }
835
836     /* (non-Javadoc)
837      * @see org.mr.core.net.NetworkListener#activityDetected()
838      */

839     public void activityDetected() {
840         this.lastReceived = SystemTime.currentTimeMillis();
841     }
842
843     public void implShutdown() {
844         boolean notifyListener = false;
845         long now = SystemTime.currentTimeMillis();
846         synchronized(this) {
847             clearBacklog();
848             if ((now - this.lastMantaMessage) >= (this.connectionTTL * 60000) &&
849                 this.connectInterval == MIN_CONNECT_INTERVAL) {
850                 if (this.log.isInfoEnabled()) {
851                     this.log.info("Last disconnect was due to TTL. " +
852                                   "Nothing to worry about.");
853                 }
854                 this.isTTLExpired = true;
855             }
856             if (this.stateListener != null && !isTTLExpired()) {
857                 notifyListener = true;
858             }
859         }
860         if (notifyListener) {
861             this.stateListener.transportDown(this);
862         }
863     }
864
865     public String JavaDoc toString() {
866         return this.info.toString() + "-" + this.remoteAgentName + "-" +
867             hashCode();
868     }
869
870     public InetSocketAddress JavaDoc getLocalSocketAddress() {
871         if (this.theImpl != null && this.theImpl.isConnected()) {
872             return this.theImpl.getLocalSocketAddress();
873         } else {
874             return null;
875         }
876     }
877
878     public String JavaDoc getRemoteAgentName() {
879         return this.remoteAgentName;
880     }
881 } // Transport
882
Popular Tags