1 46 47 52 53 package org.mr.core.net; 54 55 import java.io.IOException ; 56 import java.net.InetAddress ; 57 import java.net.InetSocketAddress ; 58 import java.net.SocketAddress ; 59 import java.nio.ByteBuffer ; 60 import java.nio.channels.SocketChannel ; 61 import java.security.MessageDigest ; 62 import java.util.ArrayList ; 63 import java.util.Collection ; 64 import java.util.HashSet ; 65 import java.util.Iterator ; 66 import java.util.List ; 67 import java.util.Set ; 68 69 import org.apache.commons.logging.Log; 70 import org.apache.commons.logging.LogFactory; 71 import org.mr.MantaAgent; 72 import org.mr.core.configuration.ConfigManager; 73 import org.mr.core.configuration.ConfigurationElement; 74 import org.mr.core.log.StartupLogger; 75 import org.mr.core.net.messages.NetworkMessage; 76 import org.mr.core.net.messages.NetworkMessageHandler; 77 import org.mr.core.net.messages.NetworkMessageID; 78 import org.mr.core.protocol.MantaBusMessage; 79 import org.mr.core.stats.StatManager; 80 import org.mr.core.util.byteable.IncomingByteBufferPool; 81 import org.mr.kernel.delivery.PostOffice; 82 import org.mr.kernel.world.WorldModeler; 83 import org.mr.kernel.world.WorldModelerNetListener; 84 import org.mr.kernel.security.MantaAuthorization; 85 86 import javax.jms.JMSSecurityException ; 87 88 96 public class NetworkManager 97 implements NetworkListener, WorldModelerNetListener, 98 NetworkMessageHandler 99 { 100 private TransportTable transportTable; 103 private NetworkSelector selector; 104 private String myAgentName; 105 106 private Log log; 107 private StatManager statManager; 108 private AgentMonitorManager monitorManager; 109 110 private boolean transportChanged; 111 112 private static boolean tcpNoDelay = true; 113 114 117 public NetworkManager(WorldModeler modeler, StatManager statManager) 118 { 119 this.log = LogFactory.getLog("NetworkManager"); 120 tcpNoDelay = MantaAgent.getInstance().getSingletonRepository() 121 .getConfigManager().getBooleanProperty("network.setTcpNoDelay", true); 122 this.transportTable = new TransportTable(); 123 this.selector = new NetworkSelector(); 124 this.statManager = statManager; 125 this.monitorManager = new AgentMonitorManager(); 126 this.transportChanged = false; 127 128 selector.setListener(this); 129 selector.start(); 130 131 modeler.getNetworkListener().addListener(this); 132 133 134 } 135 136 137 138 143 148 154 public boolean hasConnection(String agent) { 155 Set transports = transportTable.getTransports(agent); 156 if (transports != null && ! transports.isEmpty()) { 157 Iterator i = transports.iterator(); 158 Transport t = (Transport) i.next(); 159 160 return t.isConnected(); 161 } 162 return false; 163 } 164 165 167 170 public void acceptedChannel(SocketChannel channel) { 171 try { 172 InetAddress remote = channel.socket().getInetAddress(); 173 MantaAuthorization auth = 174 MantaAgent.getInstance().getSingletonRepository(). 175 getMantaAuthorization(); 176 if (auth != null) { 177 auth.authorize(remote); 178 } 179 TransportImpl impl = transportTable.addPendingTransport(channel, 180 this); 181 if (impl != null) { 182 this.selector.addTransportImpl(impl, null); 183 } 184 } catch (SecurityException e) { 185 if (this.log.isWarnEnabled()) { 186 this.log.warn("Accepted channel was not authorized: " + 187 channel + ". Discarding."); 188 } 189 try { 190 channel.close(); 191 } catch (IOException e1) {} 192 } 193 } 194 195 public void acceptedImpl(TransportImpl impl) { 196 impl.setListener(this); 197 transportTable.addPendingTransport(impl); 198 } 199 200 203 public void messageReady(CNLMessage message) { 204 if (message.getType() == CNLMessage.CNL_NETWORK) { 205 networkMessageReady(message); 206 } else if (message.getType() == CNLMessage.CNL_MANTA) { 207 mantaMessageReady(message); 208 } 209 } 210 211 public void handleNetMessageID(NetworkMessageID msg) { 213 SocketAddress remote = msg.getSource(); 214 InetAddress local = ((InetSocketAddress ) msg.getDest()).getAddress(); 215 boolean isTCP = msg.isTCP(); 216 boolean initId = msg.getInitId(); 217 218 if(log.isInfoEnabled()){ 219 log.info("HandleID: received ID (" + msg.getName() + ") from " + 220 remote.toString()+"."); 221 } 222 223 if (isTCP && this.transportTable.isPending(remote)) { 224 boolean success = 225 transportTable.associatePending(remote, msg.getName(), 226 this.myAgentName, initId); 227 if (! success) { 228 if(log.isWarnEnabled()) { 229 log.warn("A peer named " + msg.getName() + " tried to " + 230 "connect, but does not appear in the world map." + 231 " Closing connection (" + remote.toString() + 232 ")"); 233 } 234 } 235 } else { 236 Transport t = null; 237 if (isTCP) { 238 t = this.transportTable.getTransport(remote); 239 } else { 240 t = this.transportTable.getUdpTransport(((InetSocketAddress ) remote).getAddress()); 241 } 242 if (t != null) { 243 t.setInitialized(local, initId); 244 } 245 } 246 } 247 248 255 public List getConnections() { 256 return this.transportTable.getConnections(this.myAgentName); 257 } 258 259 public void addAgentStateListener(String agent, 260 AgentStateListener listener) { 261 Set transports = this.transportTable.getTransports(agent); 262 263 if (transports == null) { 264 if (this.log.isFatalEnabled()) { 265 this.log.fatal("Cannot add state listener for " + agent + 266 ": no transports defined."); 267 } 268 throw new IllegalArgumentException ("No transports for peer " + 269 agent); 270 } 271 272 Set accessibleTransports = new HashSet (); 273 Iterator i = transports.iterator(); 274 while (i.hasNext()) { 275 Transport t = (Transport) i.next(); 276 TransportType type = t.getInfo().getTransportInfoType(); 277 if (this.transportTable.isLocalType(type)) { 278 accessibleTransports.add(t); 279 } 280 } 281 this.monitorManager.addMonitor(agent, accessibleTransports, listener); 282 } 283 284 public void removeAgentStateListener(String agent, 285 AgentStateListener listener) { 286 Set transports = this.transportTable.getTransports(agent); 287 this.monitorManager.removeMonitor(agent, transports, listener); 288 } 289 290 public int getAgentState(String agent) { 291 return this.monitorManager.getAgentState(agent); 292 } 293 294 public boolean isAccessible(String agent) { 295 if (agent.equals(this.myAgentName)) { 296 return true; 297 } 298 return this.transportTable.isAccessible(agent); 299 } 300 301 305 public InetAddress getLocalInterface(String agent) { 306 return this.transportTable.getLocalInterface(agent); 307 } 308 309 318 public InetSocketAddress createServer(InetAddress addr, int portMin, 319 int portMax, TransportType type) 320 throws IOException 321 { 322 try { 323 LocalTransport local = 324 TransportProvider.createLocalTransport(type, addr, portMin, 325 portMax, this, 326 this.selector); 327 this.transportTable.addLocalTransport(local); 328 this.transportChanged = true; 329 return (InetSocketAddress ) local.getSocketAddress(); 330 } catch (IllegalArgumentException e) { 331 StartupLogger.log.error("Could not create server" + 333 " socket on " + addr.getHostName() + ":" + 334 portMin + 335 (portMin == portMax ? "" : ("-" + portMax)) + 336 ": " + e.getMessage(), "NetworkManager"); 337 return null; 343 } 344 } 345 346 350 public void destroyServer(InetSocketAddress bindAddr) { 351 LocalTransport local = 352 this.transportTable.removeLocalTransport(bindAddr); 353 this.transportChanged = true; 354 local.close(); 355 } 356 357 360 public void startServers() { 361 Iterator i = this.transportTable.getLocalTransports().iterator(); 362 while (i.hasNext()) { 363 LocalTransport local = (LocalTransport) i.next(); 364 local.open(); 365 } 366 } 367 368 371 public Collection getLocalTransports(){ 372 return this.transportTable.getLocalTransports(); 373 } 374 375 379 public void startServer(InetSocketAddress addr) { 380 LocalTransport local = 381 this.transportTable.getLocalTransport(addr); 382 local.open(); 383 } 384 385 public void sendBuffer(MantaBusMessage message) { 386 Transport out = decideTransport(message.getRealNetAddress()); 387 if(log.isDebugEnabled()){ 388 log.debug("Sending buffer: Message ID=" + message.getMessageId() + 389 " using transport " + 390 (out != null ? out.toString() : "null")); 391 } 392 if (out == null) { 393 if(log.isWarnEnabled() && this.transportChanged){ 394 log.warn("Cannot send message to " + 395 message.getRealNetAddress().getAgentName()+ 396 ": no transports defined for the remote peer, or " + 397 "no local transports exist which match remote " + 398 "peer's transport type. Please review your " + 399 "configuration."); 400 } 401 message.release(false); 402 this.transportChanged = false; 403 return; 404 } 405 406 CNLMessage cnlMessage = null; 407 try { 408 ByteBuffer [] buffers = message.getNetBuffers(); 409 cnlMessage = new CNLMessage(CNLMessage.CNL_MANTA, buffers); 410 cnlMessage.setBusMessage(message); 411 cnlMessage.use(); 412 } catch (IOException e) { 413 if(log.isErrorEnabled()){ 414 log.error("An error occured during message serialization.", e); 415 return; 416 } 417 } 418 419 try { 420 out.createImpls(); 421 } catch (IOException e) { 422 if(log.isWarnEnabled()){ 423 log.warn("Cannot create connection to " + 424 out.getInfo().getSocketAddress().toString() + 425 ": " + e.toString()+"."); 426 } 427 cnlMessage.unuse(); 428 return; 429 } 430 out.sendMantaMessage(cnlMessage); 431 cnlMessage.unuse(); 432 } 433 434 436 public void handleAgentAdded(String agent) { 437 if(log.isDebugEnabled()){ 438 log.debug("New peer discovered: "+agent+"."); 439 } 440 441 if (! agent.equals(myAgentName)) { 442 transportTable.addAgent(agent, new HashSet ()); 443 } 444 } 445 446 public void handleAgentRemoved(String agent) { 447 if(log.isDebugEnabled()){ 448 log.debug("Peer disconnected: "+agent+"."); 449 } 450 451 Set transports = transportTable.removeAgent(agent); 452 if (transports != null) { 453 Iterator i = transports.iterator(); 454 while (i.hasNext()) { 455 Transport t = (Transport) i.next(); 456 t.shutdown(); 457 } 458 } 459 this.monitorManager.removeMonitor(agent, transports); 460 } 461 462 public void handleAgentsTransportAdded(String agent, TransportInfo info) { 463 if(log.isInfoEnabled()){ 464 log.info("Peer transport info resolved: peer="+agent+ 465 ", info="+info.toString()+"."); 466 } 467 String mwbName = "mwb"; 468 boolean indirect = 469 (info.getTransportInfoType() == TransportType.MWB && 470 !agent.equals(mwbName) && !myAgentName.equals(mwbName)); 471 boolean passive = 472 (info.getTransportInfoType() == TransportType.MWB && 473 myAgentName.equals(mwbName)); 474 if (!this.transportTable.exists(agent, info, indirect)) { 475 Transport t = null; 476 if (indirect) { 477 Set set = this.transportTable.getTransports(mwbName); 478 Transport master = null; 479 if (set != null && !set.isEmpty()) { 480 master = (Transport) set.toArray()[0]; 481 } 482 t = new IndirectTransport(info, this.myAgentName, agent, 483 this, this.statManager, 484 this.selector, master); 485 } else { 486 t = new Transport(info, this.myAgentName, agent, 487 this, this.statManager, 488 this.selector, passive); 489 } 490 this.transportTable.addTransport(agent, t); 491 if (indirect || 492 transportTable.isLocalType(info.getTransportInfoType())) { 493 this.monitorManager.addTransport(agent, t); 495 } 496 this.transportChanged = true; 497 } else { 498 if(log.isInfoEnabled()) { 499 log.info("Peer already exists. Doing nothing."); 500 } 501 } 502 } 503 504 public void handleAgentsTransportsAdded(String agent, List infos) { 505 Iterator i = infos.iterator(); 506 while (i.hasNext()) { 507 handleAgentsTransportAdded(agent, (TransportInfo) i.next()); 508 } 509 } 510 511 public void handleAgentsTransportRemoved(String agent, TransportInfo info) 512 { 513 if(log.isInfoEnabled()){ 514 log.info("doHandleAgentsTransportRemoved(): peer="+agent + 515 ", info="+info.toString()+"."); 516 517 } 518 519 boolean indirect = 520 (info.getTransportInfoType() == TransportType.MWB); 521 522 Transport t = 523 this.transportTable.removeTransport(agent, 524 info.getSocketAddress(), 525 indirect); 526 527 if (t != null) { 528 this.monitorManager.removeTransport(agent, t); 529 t.shutdown(); 530 } 531 532 this.transportChanged = true; 533 } 534 535 private void mantaMessageReady(CNLMessage message) { 536 SocketAddress remote = null; 537 if (message.isTCP()) { 538 remote = message.getSourceAddress(); 539 } 540 if (remote != null && this.transportTable.isPending(remote)) { 541 if(log.isWarnEnabled()){ 542 log.warn("Discarding a message from unidentifed channel (remote = " + remote + ")."); 543 } 544 } else { 545 try { 546 byte[] messageMD5 = message.getMessageMD5(); 550 MessageDigest partialMD5 = null; 551 if (messageMD5 != null) { 552 partialMD5 = message.getPartialMD5(); 553 } 554 555 PostOffice post = MantaAgent.getInstance(). 556 getSingletonRepository().getPostOffice(); 557 post.messageArrived(message.buffers()[0], messageMD5, 558 partialMD5); 559 } catch (Throwable t) { 560 if(log.isErrorEnabled()){ 561 log.error("Cannot pass buffer to protocol handler.", t); 562 } 563 } 564 } 565 } 566 567 private void networkMessageReady(CNLMessage message) 568 { 569 ByteBuffer buf = message.valueAsBuffers()[0]; 570 NetworkMessage.create(buf, message.isTCP(), message.getSourceAddress(), message.getDestAddress(), this); 571 IncomingByteBufferPool.getInstance().release(message.buffers()[0]); 572 } 573 574 private Transport decideTransport(MantaAddress address) { 575 String agent = address.getAgentName(); 576 Set transports = this.transportTable.getTransports(agent); 577 Transport best; 578 579 if (transports == null) { 585 return null; 586 } 587 best = findBestTransport(transports, true); 588 if (best == null) { 589 best = findBestTransport(transports, false); 590 } else if (best.getInfo().getTransportInfoType() == 591 TransportType.MWB) { 592 Transport secondBest = findBestTransport(transports, false); 593 if (secondBest != null) { 594 secondBest.createImplsMaybe(); 595 } 596 } 597 598 return best; 599 } 600 601 private Transport findBestTransport(Set transports, boolean connectedOnly) 602 { 603 Iterator i = transports.iterator(); 604 Transport best = null; 605 TransportType bestType = TransportType.UNKNOWN; 606 607 while (i.hasNext()) { 608 Transport t = (Transport) i.next(); 609 TransportType type = t.getInfo().getTransportInfoType(); 610 if (connectedOnly) { 611 if (t.isInitialized() && 612 type.getPriority() < bestType.getPriority() && 613 this.transportTable.isLocalType(type)) { 614 best = t; 615 bestType = type; 616 } 617 } else { 618 if (type.getPriority() < bestType.getPriority() && 619 this.transportTable.isLocalType(type)) { 620 best = t; 621 bestType = type; 622 } 623 } 624 } 625 626 return best; 627 } 628 629 632 public void activityDetected() { 633 } 635 636 public void implShutdown() { 637 } 639 640 641 644 public void createServerSockets() throws IOException { 645 ConfigManager config = MantaAgent.getInstance(). 646 getSingletonRepository().getConfigManager(); 647 648 ArrayList transports = 649 config.getConfigurationElements("network.transports.transport"); 650 651 int size = transports.size(); 652 for (int i = 0; i < size; i++) { 653 ConfigurationElement transProp = 654 (ConfigurationElement) transports.get(i); 655 656 String ip = 657 transProp.getSubConfigurationElementByName("ip").getValue(); 658 String port = 659 transProp.getSubConfigurationElementByName("port").getValue(); 660 String type = 661 transProp.getSubConfigurationElementByName("type").getValue(); 662 InetAddress add = InetAddress.getByName(ip); 663 TransportType transType = 664 TransportType.getTransportTypeFromString(type); 665 666 if (transType == TransportType.UNKNOWN) { 667 StartupLogger.log.error("Error while creating " + 672 "server socket: Unknown transport type: " 673 + type, "NetworkManager"); 674 continue; 675 } 676 int minPort = 0; 677 int maxPort = 0; 678 if(port.indexOf("-") != -1) { 679 minPort = 680 Integer.parseInt(port.substring(0, port.indexOf("-"))); 681 maxPort = 682 Integer.parseInt(port.substring(port.indexOf("-") + 1)); 683 } else { 684 minPort = maxPort = Integer.parseInt(port); 685 } 686 try { 687 createServer(add, minPort, maxPort, transType); 688 } catch (IOException e) { 689 StartupLogger.log.error("NetworkManager ERROR Could not " + 693 "create " + type + 694 " server socket on " + ip + ":" + 695 port + ": " + e.getMessage(), 696 "NetworkManager"); 697 } 698 } 699 } 700 701 public void setMyAgentName(String myAgentName) { 702 this.myAgentName = myAgentName; 703 if(log.isInfoEnabled()){ 704 log.info("Local MantaRay peer name is '" + this.myAgentName + "'"+"."); 705 } 706 } 707 protected static boolean isTcpNoDelay() { 708 return tcpNoDelay; 709 } 710 } 711 | Popular Tags |