1 46 package org.mr.core.net; 47 48 60 61 import java.io.IOException ; 62 import java.io.UnsupportedEncodingException ; 63 import java.net.InetAddress ; 64 import java.net.InetSocketAddress ; 65 import java.nio.ByteBuffer ; 66 import java.nio.channels.SelectableChannel ; 67 import java.nio.channels.SocketChannel ; 68 69 import java.util.Iterator ; 70 import java.util.LinkedList ; 71 import java.util.List ; 72 73 import org.mr.MantaAgent; 74 import org.mr.core.configuration.ConfigManager; 75 import org.apache.commons.logging.Log; 76 import org.apache.commons.logging.LogFactory; 77 import org.mr.core.net.messages.NetworkMessage; 78 import org.mr.core.net.messages.NetworkMessageID; 79 import org.mr.core.net.messages.NetworkMessageKeepalive; 80 import org.mr.core.stats.StatManager; 81 import org.mr.core.stats.TemporalCounter; 82 import org.mr.core.util.SystemTime; 83 import org.mr.core.util.byteable.IncomingByteBufferPool; 84 85 public class Transport implements NetworkListener { 86 class BacklogItem { 87 public CNLMessage cnlMessage; 88 89 public BacklogItem(CNLMessage cnlMessage) { 90 this.cnlMessage = cnlMessage; 91 } 92 } 93 94 private TransportInfo info; 95 private TransportImpl theImpl; 97 private Log log; 99 private LinkedList backlog; 100 private String myAgentName; 101 private String remoteAgentName; 102 private NetworkListener listener; 104 private StatManager statManager; 105 private long totalBytes; 106 private long totalMessages; 107 private TemporalCounter fiveMinBytes; 108 private TemporalCounter fiveMinMessages; 109 private int lastIdSent; 110 private NetworkSelector selector; 111 private boolean sentInit; 112 private boolean passive; 113 114 private static final long MAX_CONNECT_INTERVAL = 300000; private static final long MIN_CONNECT_INTERVAL = 3000; private static final double CONNECT_INTERVAL_FACTOR = 1.2; 118 119 private boolean debugKeepalive; 120 protected TransportStateListener stateListener; 121 private long lastSent; private long lastReceived; private long lastMantaMessage; private long lastConnect; private long connectInterval; private int keepaliveInterval; private int deadFactor; private short connectionTTL; private boolean isTTLExpired; 131 132 149 public Transport(TransportInfo info, String myAgentName, 150 String remoteAgentName, NetworkListener listener, 151 StatManager statManager, 152 NetworkSelector selector, boolean passive) { 153 this.info = info; 154 this.log = LogFactory.getLog("Transport"); 155 this.backlog = new LinkedList (); 156 this.myAgentName = myAgentName; 157 this.remoteAgentName = remoteAgentName; 158 this.listener = listener; 159 this.statManager = statManager; 160 this.totalMessages = 0; 161 this.totalBytes = 0; 162 this.fiveMinMessages = new TemporalCounter(5000, 60); 163 this.fiveMinBytes = new TemporalCounter(5000, 60); 164 this.selector = selector; 165 166 this.lastIdSent = 0; 167 168 this.theImpl = null; 169 this.sentInit = false; 170 this.passive = passive; 171 this.stateListener = null; 173 this.lastSent = -1; 174 this.lastReceived = -1; 175 this.lastMantaMessage = SystemTime.currentTimeMillis(); 176 this.lastConnect = -1; 177 this.connectInterval = MIN_CONNECT_INTERVAL; 178 ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager(); 179 this.deadFactor = config.getIntProperty("network.keepalive.deadfactor", 20); 180 this.keepaliveInterval = config.getIntProperty("network.keepalive.interval", 1); 181 this.debugKeepalive = config.getBooleanProperty("network.keepalive.debug", false); 182 183 this.connectionTTL = 185 config.getShortProperty("network.keepalive.connectionTTL", (short) 5); 186 this.isTTLExpired = false; 187 188 } 190 194 public TransportInfo getInfo() { 195 return info; 196 } 197 198 202 public void setInfo(TransportInfo newInfo) { 203 this.info = newInfo; 204 } 205 206 210 public void createImpls() throws IOException { 211 boolean finishConnecting = false; 212 try { 213 synchronized(this) { 214 long now = SystemTime.currentTimeMillis(); 215 this.lastConnect = now; 216 this.lastMantaMessage = now; 217 this.isTTLExpired = false; 218 219 if (theImpl == null || theImpl.isDown()) { 220 theImpl = TransportProvider. 222 createImpl(this.info.getTransportInfoType(), 223 null, 224 this.info.getSocketAddress(), 225 this.remoteAgentName, this); 226 227 if (this.theImpl != null) { 228 theImpl.setListener(this); 229 if (theImpl.isConnected()) { 230 finishConnecting = true; 231 } else { 232 this.selector.addTransportImpl(theImpl, this); 233 } 234 } 235 } 236 } 237 } catch (IOException e) { 238 if (this.stateListener != null) { 239 this.stateListener.transportDown(this); 240 } 241 throw e; 242 } 243 if (finishConnecting) { 244 finishedConnecting(this.theImpl); 245 } 246 } 247 248 252 public void createImplsMaybe() { 253 long now = SystemTime.currentTimeMillis(); 254 boolean tryCreate = false; 255 synchronized (this) { 256 if ((now - lastConnect) >= connectInterval) { 257 tryCreate = true; 258 connectInterval = (long) 259 (CONNECT_INTERVAL_FACTOR * connectInterval); 260 if (connectInterval > MAX_CONNECT_INTERVAL) { 261 connectInterval = MAX_CONNECT_INTERVAL; 262 } 263 } 264 } 265 if (tryCreate) { 266 try { 267 createImpls(); 268 } catch (IOException e) {} 269 } 270 } 271 272 private void sendInit(TransportImpl impl) { 273 try { 274 sendNetworkMessage(new NetworkMessageID(!this.sentInit, 275 myAgentName), 276 impl); 277 this.sentInit = true; 278 } catch (UnsupportedEncodingException e) {} 279 } 280 281 private void sendKeepalive(long now) { 282 this.lastSent = now; 283 NetworkMessageKeepalive ka = 284 new NetworkMessageKeepalive(this.keepaliveInterval, 285 this.connectionTTL); 286 if (this.theImpl != null && this.theImpl.isConnected()) { 287 sendNetworkMessage(ka, this.theImpl); 288 } 289 if (debugKeepalive){ 290 log.info("Keep Alive lastSent = " + this.lastSent); 291 } 292 } 293 294 299 public synchronized boolean isConnected() { 300 return this.theImpl != null && this.theImpl.isConnected(); 301 } 302 303 309 public synchronized boolean isInitialized() { 310 return this.theImpl != null && this.theImpl.isInitialized(); 311 } 312 313 public synchronized boolean isDown() { 314 return this.theImpl == null || this.theImpl.isDown(); 315 } 316 317 public void setInitialized(InetAddress local, boolean initId) 318 { 319 boolean wasInitialized = isInitialized(); 320 synchronized(this) { 321 322 if (this.theImpl != null) { 323 this.theImpl.setInitialized(); 324 sendBacklog(); 325 } 326 this.connectInterval = MIN_CONNECT_INTERVAL; 327 } 328 if (this.stateListener != null && !wasInitialized) { 329 this.stateListener.transportUp(this); 330 } 331 } 332 333 public void finishedConnecting(SocketChannel channel) { 334 boolean notifyListener = false; 335 synchronized(this) { 336 SelectableChannel existing = this.theImpl.getChannel(); 337 if (existing == channel) { 338 if (this.theImpl.isConnected()) { 339 this.selector.addTransportImpl(this.theImpl, this); 341 this.theImpl.onConnect(); 342 sendInit(this.theImpl); 343 } else { 349 this.theImpl.shutdown(); 350 if (isDown()) { 351 if(log.isWarnEnabled()){ 352 log.warn("Transport " + getInfo().toString() + 353 " can't connect. Clearing backlog (" + 354 this.backlog.size() + 355 " messages deleted)"); 356 } 357 clearBacklog(); 358 if (this.stateListener != null) { 359 notifyListener = true; 360 } 361 } 362 } 363 } 364 } 365 if (notifyListener) { 366 this.stateListener.transportDown(this); 367 } 368 } 369 370 void finishedConnecting(TransportImpl impl) { 371 boolean notifyListener = false; 372 synchronized(this) { 373 if (impl.isConnected()) { 374 this.selector.addTransportImpl(impl, this); 376 impl.onConnect(); 377 sendInit(impl); 378 } else { 384 impl.shutdown(); 385 if (isDown()) { 386 if(log.isWarnEnabled()){ 387 log.warn("Transport " + getInfo().toString() + 388 " can't connect. Clearing backlog (" + 389 this.backlog.size() + " messages deleted)"); 390 } 391 clearBacklog(); 392 if (this.stateListener != null) { 393 notifyListener = true; 394 } 395 } 396 } 397 } 398 if (notifyListener) { 399 this.stateListener.transportDown(this); 400 } 401 } 402 403 private void clearBacklog() { 404 synchronized (this.backlog) { 405 Iterator i = this.backlog.iterator(); 406 while (i.hasNext()) { 407 BacklogItem item = (BacklogItem) i.next(); 408 item.cnlMessage.unuse(); 409 } 410 this.backlog.clear(); 411 } 412 } 413 414 private void sendBacklog() { 415 synchronized (this.backlog) { 416 while (!this.backlog.isEmpty()) { 417 BacklogItem b = (BacklogItem) this.backlog.removeFirst(); 418 realSendMantaMessage(b.cnlMessage); 419 } 420 } 421 } 422 423 private void sendBuffer(CNLMessage message, int id, TransportImpl impl) 424 throws IOException 425 { 426 impl.write(message, id, this.selector); 427 } 428 429 private int sendBuffer(CNLMessage msg, int id) 430 throws IOException 431 { 432 if (this.theImpl != null && this.theImpl.isInitialized()) { 433 try { 434 sendBuffer(msg, id, this.theImpl); 435 } catch (IOException e) { 436 this.theImpl.shutdown(); 437 if (this.stateListener != null) { 438 this.stateListener.transportDown(this); 439 } 440 throw e; 441 } 442 443 this.lastSent = SystemTime.currentTimeMillis(); 444 445 if (debugKeepalive) 446 log.info("Keep Alive lastSent = " + this.lastSent); 447 448 return 1; 449 } else 450 return 0; 451 } 452 453 456 public void shutdown() { 457 if (this.theImpl != null) { 458 this.theImpl.shutdown(); 459 } 460 } 461 462 472 public void sendMantaMessage(CNLMessage cnlMessage) { 473 cnlMessage.use(); 474 if (!isInitialized()) { 475 synchronized (this.backlog) { 476 this.backlog.add(new BacklogItem(cnlMessage)); 477 } 478 } else { 479 realSendMantaMessage(cnlMessage); 480 } 481 } 482 483 private void realSendMantaMessage(CNLMessage cnlMessage) { 484 try { 485 int messageSize; 486 synchronized (this) { 487 sendBuffer(cnlMessage, getSendId()); 488 messageSize = cnlMessage.getTotalLength(); 489 this.totalMessages++; 490 this.totalBytes += messageSize; 491 this.fiveMinMessages.addSample(1); 492 this.fiveMinBytes.addSample(messageSize); 493 this.lastMantaMessage = SystemTime.currentTimeMillis(); 494 } 495 this.statManager.addMessageSample(messageSize); 496 } catch (IOException e) { 497 if(log.isErrorEnabled()){ 498 log.error("Cannot write Manta message to " + this.info.getSocketAddress().toString() + " (" + e.toString() + ")"); 499 } 500 } 501 502 cnlMessage.unuse(); 503 } 504 505 private void sendNetworkMessage(NetworkMessage msg, TransportImpl impl) { 506 int len = NetworkMessage.NET_HEADERLEN + msg.getLength(); 507 ByteBuffer buf = ByteBuffer.allocate(len); 508 buf.limit(len); 509 msg.write(buf); 510 ByteBuffer [] buffers = new ByteBuffer [1]; 511 buffers[0] = buf; 512 CNLMessage cnlMsg = new CNLMessage(CNLMessage.CNL_NETWORK, buffers); 513 514 try { 515 cnlMsg.use(); 516 sendBuffer(cnlMsg, getSendId(), impl); 517 cnlMsg.unuse(); 518 } catch (IOException e) { 519 if(log.isErrorEnabled()) 520 log.error("Cannot write network message to " + 521 this.info.getSocketAddress().toString()); 522 } 523 } 524 525 private synchronized int getSendId() { 526 if (this.lastIdSent == Integer.MAX_VALUE) { 527 this.lastIdSent = 0; 528 } 529 530 this.lastIdSent++; 531 532 return this.lastIdSent; 533 } 534 535 545 public void mergeImpl(TransportImpl newImpl, boolean initId) { 546 boolean notifyListener = false; 547 synchronized(this) { 548 this.lastMantaMessage = SystemTime.currentTimeMillis(); 549 this.lastReceived = this.lastMantaMessage; 550 newImpl.setListener(this); 551 TransportImpl oldImpl = this.theImpl; 552 if(log.isDebugEnabled()){ 553 log.debug("mergeImpl(): old = " + 554 (oldImpl == null ? "null" : oldImpl.toString()) + 555 "; new = " + newImpl.toString()); 556 } 557 if (oldImpl == null || !oldImpl.isConnected()) { 558 this.theImpl = newImpl; 559 sendInit(newImpl); 560 newImpl.setInitialized(); 561 if (this.stateListener != null) { 562 notifyListener = true; 563 } 564 } else { 565 String srcOld = oldImpl.getLocalSocketAddress().toString(); 566 String dstNew = newImpl.getRemoteSocketAddress().toString(); 567 if (srcOld.compareTo(dstNew) > 0) { 568 if(log.isDebugEnabled()){ 570 log.debug("mergeImpl(): shut down new impl."); 571 } 572 newImpl.shutdown(); 573 } else { 574 oldImpl.shutdown(); 576 this.theImpl = newImpl; 577 sendInit(newImpl); 578 newImpl.setInitialized(); 579 if (this.stateListener != null) { 580 notifyListener = true; 581 } 582 if(log.isDebugEnabled()){ 583 log.debug("mergeImpl(): shut down old impl."); 584 } 585 } 586 } 587 sendBacklog(); 588 } 589 if (notifyListener) { 590 this.stateListener.transportUp(this); 591 } 592 } 593 594 595 598 public void acceptedChannel(SocketChannel channel) { 599 if(log.isErrorEnabled()){ 600 log.error("acceptedChannel(channel) is not implemented by Transport"); 601 } 602 } 603 604 public void acceptedImpl(TransportImpl impl) { 605 if(log.isErrorEnabled()){ 606 log.error("acceptedImpl(impl) is not implemented by Transport"); 607 } 608 } 609 610 613 public void messageReady(CNLMessage message) { 614 boolean sendToListener = false; 615 synchronized(this) { 616 int id = message.getID(); 617 if (!checkForKeepalive(message)) { 624 this.lastMantaMessage = SystemTime.currentTimeMillis(); 625 sendToListener = true; 627 } else { 628 IncomingByteBufferPool.getInstance(). 629 release(message.buffers()[0]); 630 } 631 641 this.lastReceived = SystemTime.currentTimeMillis(); 642 } 643 if (sendToListener) { 644 this.listener.messageReady(message); 645 } 646 } 647 648 private boolean checkForKeepalive(CNLMessage message) { 649 if (message.getType() != CNLMessage.CNL_NETWORK) { 650 return false; 651 } 652 ByteBuffer buf = message.valueAsBuffers()[0]; 653 try { 654 NetworkMessageKeepalive keepalive = (NetworkMessageKeepalive) 655 NetworkMessage.create(buf, false, null, null); 656 if (keepalive == null) { 657 return false; 658 } 659 if (debugKeepalive) 660 log.info("Keep Alive[" + this.info.toString() + 661 "] received keep alive"); 662 if (keepalive.getInterval() < this.keepaliveInterval) { 663 this.keepaliveInterval = keepalive.getInterval(); 664 } 665 if (keepalive.getConnectionTTL() > this.connectionTTL) { 666 this.connectionTTL = keepalive.getConnectionTTL(); 667 } 668 669 long now = SystemTime.currentTimeMillis(); 670 if ((now - this.lastSent) >= (this.keepaliveInterval * 1000)) { 671 if (debugKeepalive) 672 log.info("Keep Alive[" + this.info.toString() + 673 "] sending response keep alive (now = " + now + 674 "; lastS = " + lastSent + ")"); 675 sendKeepalive(now); 676 } 677 } catch (ClassCastException e) { 678 return false; 679 } 680 return true; 681 } 682 709 public synchronized List getConnectedImpls() { 710 LinkedList result = new LinkedList (); 711 712 if (this.theImpl != null && this.theImpl.isConnected()) { 713 result.add(this.theImpl); 714 } 715 return result; 716 } 717 718 public long getTotalMessages() { 719 return this.totalMessages; 720 } 721 722 public long getTotalBytes() { 723 return this.totalBytes; 724 } 725 726 public long getFiveMinMessages() { 727 return this.fiveMinMessages.getValue(); 728 } 729 730 public long getFiveMinBytes() { 731 return this.fiveMinBytes.getValue(); 732 } 733 734 public void setStateListener(TransportStateListener listener) { 735 this.stateListener = listener; 736 } 737 738 747 public void keepalive() { 748 boolean shouldShutdown = false; 749 synchronized(this) { 750 long now = SystemTime.currentTimeMillis(); 751 if (this.lastReceived == -1) { 752 this.lastReceived = now; 753 } 754 755 if (isInitialized()) { 756 if (debugKeepalive) 757 log.info("Keep Alive[" + this.info.toString() + 758 "] sending timer keep alive (now = " + now + 759 "; lastS = " + lastSent + ")"); 760 sendKeepalive(now); 761 762 long deadInterval = 763 (long) this.keepaliveInterval * this.deadFactor * 1000; 764 if (deadInterval < 0) { 766 deadInterval = Long.MAX_VALUE; 767 } 768 if ((now - this.lastReceived) >= deadInterval) { 769 if (log.isInfoEnabled()) 770 log.info("Keep Alive[" + this.info.toString() + 771 "] disconnect (now = " + now + 772 "; lastR = " + this.lastReceived + ")"); 773 shouldShutdown = true; 774 } 776 if ((now - this.lastMantaMessage) >= 777 (this.connectionTTL * 60000 + 60000)) { 778 if(log.isInfoEnabled()){ 779 log.info("Keep Alive[" + this.info.toString() + 780 "] connection TTL expired (now = " + now + 781 "; lastManta = " + this.lastMantaMessage + 782 ")"); 783 } 784 this.isTTLExpired = true; 785 shouldShutdown = true; 786 } 788 } else if (isDown() && !isTTLExpired() && !isPassive()) { 789 createImplsMaybe(); 800 } 815 } 816 if (shouldShutdown) { 817 shutdown(); 818 } 819 } 823 824 public boolean isIndirect() { 825 return false; 826 } 827 828 public boolean isPassive() { 829 return this.passive; 830 } 831 832 private boolean isTTLExpired() { 833 return this.isTTLExpired; 834 } 835 836 839 public void activityDetected() { 840 this.lastReceived = SystemTime.currentTimeMillis(); 841 } 842 843 public void implShutdown() { 844 boolean notifyListener = false; 845 long now = SystemTime.currentTimeMillis(); 846 synchronized(this) { 847 clearBacklog(); 848 if ((now - this.lastMantaMessage) >= (this.connectionTTL * 60000) && 849 this.connectInterval == MIN_CONNECT_INTERVAL) { 850 if (this.log.isInfoEnabled()) { 851 this.log.info("Last disconnect was due to TTL. " + 852 "Nothing to worry about."); 853 } 854 this.isTTLExpired = true; 855 } 856 if (this.stateListener != null && !isTTLExpired()) { 857 notifyListener = true; 858 } 859 } 860 if (notifyListener) { 861 this.stateListener.transportDown(this); 862 } 863 } 864 865 public String toString() { 866 return this.info.toString() + "-" + this.remoteAgentName + "-" + 867 hashCode(); 868 } 869 870 public InetSocketAddress getLocalSocketAddress() { 871 if (this.theImpl != null && this.theImpl.isConnected()) { 872 return this.theImpl.getLocalSocketAddress(); 873 } else { 874 return null; 875 } 876 } 877 878 public String getRemoteAgentName() { 879 return this.remoteAgentName; 880 } 881 } | Popular Tags |