1 46 47 48 package org.mr; 49 50 51 import java.io.File ; 52 import java.io.IOException ; 53 import java.util.ArrayList ; 54 import java.util.Enumeration ; 55 56 import org.apache.commons.logging.Log; 57 import org.apache.commons.logging.LogFactory; 58 import org.mr.api.jms.MantaConnection; 59 import org.mr.api.rmi.MantaRMIServer; 60 import org.mr.core.cmc.MantaJMXManagement; 61 import org.mr.core.configuration.ConfigManager; 62 import org.mr.core.groups.MutlicastGroupManager; 63 import org.mr.core.log.StartupLogger; 64 import org.mr.core.net.MantaAddress; 65 import org.mr.core.net.NetworkManager; 66 import org.mr.core.persistent.PersistentConst; 67 import org.mr.core.protocol.MantaBusMessage; 68 import org.mr.core.protocol.MantaBusMessageConsts; 69 import org.mr.core.protocol.MantaBusMessageUtil; 70 import org.mr.core.protocol.MessageTransformer; 71 import org.mr.core.stats.StatManager; 72 import org.mr.core.util.SystemTime; 73 import org.mr.core.util.byteable.ByteBufferPool; 74 import org.mr.core.util.byteable.ByteableRegistry; 75 import org.mr.indexing.WBManager; 76 import org.mr.kernel.BlockingMessageListener; 77 import org.mr.kernel.DelayedMessageSender; 78 import org.mr.kernel.IncomingClientMessageRouter; 79 import org.mr.kernel.IncomingMessageManager; 80 import org.mr.kernel.PluginManager; 81 import org.mr.kernel.UniqueIDGenerator; 82 import org.mr.kernel.security.SecurityInitializer; 83 import org.mr.kernel.control.ControlSignal; 84 import org.mr.kernel.control.ControlSignalMessageConsumer; 85 import org.mr.kernel.delivery.DeliveryAckListener; 86 import org.mr.kernel.delivery.DeliveryAckNotifier; 87 import org.mr.kernel.delivery.PostOffice; 88 import org.mr.kernel.services.MantaService; 89 import org.mr.kernel.services.SelectorsManager; 90 import org.mr.kernel.services.ServiceActor; 91 import org.mr.kernel.services.ServiceActorControlCenter; 92 import org.mr.kernel.services.ServiceConsumer; 93 import org.mr.kernel.services.ServiceProducer; 94 import org.mr.kernel.services.ServiceRecallShutdownHook; 95 import org.mr.kernel.services.queues.*; 96 import org.mr.kernel.services.topics.VirtualTopicManager; 97 import org.mr.kernel.world.WorldModeler; 98 import org.mr.kernel.world.WorldModelerLoader; 99 import org.w3c.dom.Element ; 100 101 102 117 public class MantaAgent { 118 119 private SingletonRepository singletonRepository = null; 120 private DynamicRepository dynamicRepository; 121 private static MantaAgent instance = null; 122 private static Log log; 123 124 125 private String mantaConfigurationFile; 126 127 130 public static boolean started = false; 131 private String myName; 132 137 private static Element configurationElement; 138 139 146 public static MantaAgent getInstance() { 147 if (instance == null) { 148 synchronized(MantaAgent.class){ 149 if (instance == null) 150 instance = new MantaAgent(); 151 } 152 } 153 154 return instance; 155 156 } 158 161 private MantaAgent() { 162 StartupLogger.log.startStore(); 163 try { 164 if (configurationElement == null){ 168 mantaConfigurationFile = System.getProperty(MantaAgentConstants.MANTA_CONFIG); 169 if (mantaConfigurationFile == null) { 170 StartupLogger.log.fatal("MantaRay configuration not set! - Please set the system property 'mantaConfig' to the location of MantaRay configuration file, or use setConfiguration method to supply a DOM element for configuration.", "MantaAgent"); 171 StartupLogger.log.fatal("In order for manta to work properly either of the two configurations need to be set.", "MantaAgent"); 172 mantaConfigurationFile = "./default_config.xml"; 173 } 174 StartupLogger.log.info("property 'mantaConfig'=" + mantaConfigurationFile, "MantaAgent"); 175 176 FileOrFolderExists(mantaConfigurationFile); 178 } 179 } 180 catch (Throwable t) { 181 t.printStackTrace(); 182 } 184 } 186 193 public synchronized boolean init() { 194 if (started) return true; 195 try { 196 ConfigManager configManager = null; 199 if (configurationElement != null){ 200 configManager = new ConfigManager(configurationElement); 201 configurationElement = null; 203 } else 204 configManager = new ConfigManager(mantaConfigurationFile); 205 206 this.singletonRepository = new SingletonRepository(); 207 singletonRepository.setConfigManager(configManager); 208 209 singletonRepository.setVirtualTopicManager(new VirtualTopicManager()); 210 SystemTime.init(); 211 212 StatManager statManager = new StatManager(); 213 singletonRepository.setStatManager(statManager); 214 215 SecurityInitializer.initialize(); 217 218 NetworkManager networkmanager =new NetworkManager(WorldModeler.getInstance(), statManager); 220 networkmanager.createServerSockets(); 221 singletonRepository.setNetworkManager(networkmanager); 222 223 String defaultPersistentDir = "./persistent"; 224 String persistentDir = configManager.getStringProperty("persistency.file.persistent_folder", defaultPersistentDir); 225 if (persistentDir.trim().length() == 0) 226 persistentDir = defaultPersistentDir; 227 PersistentConst.setPersistentDir(persistentDir); 228 int numOfSmallBuffersInPool = configManager.getIntProperty("small_buffer_pool_size", 100); 229 int numOfMediumBuffersInPool= configManager.getIntProperty("medium_buffer_pool_size", 50); 230 int numOfLargeBuffersInPool= configManager.getIntProperty("big_buffer_pool_size", 10); 231 if (configManager.getBooleanProperty("LazyMessageParsing", false)) { 232 MantaBusMessage.setLazyParsing(); 233 } 234 PersistentConst.setPersistentByteBufferPool(new ByteBufferPool(numOfSmallBuffersInPool, numOfMediumBuffersInPool, numOfLargeBuffersInPool)); 235 236 MessageTransformer transformer = new MessageTransformer(); 237 238 ByteableRegistry.init(); 240 241 singletonRepository.setDelayedMessageSender(new DelayedMessageSender()); 244 singletonRepository.setDeliveryAckNotifier(new DeliveryAckNotifier()); 245 246 singletonRepository.setPostOffice(new PostOffice(WorldModeler.getInstance())); 247 248 singletonRepository.setVirtualQueuesManager(new VirtualQueuesManager()); 249 250 252 singletonRepository.setWorldModeler(WorldModeler.getInstance()); 253 singletonRepository.setControlSignalMessageConsumer(new ControlSignalMessageConsumer()); 254 singletonRepository.setIncomingMessageManager(new IncomingMessageManager()); 255 singletonRepository.setIncomingClientMessageRouter(new IncomingClientMessageRouter()); 256 singletonRepository.setSelectorsManager(new SelectorsManager()); 257 singletonRepository.setGroupsManager(new MutlicastGroupManager()); 258 singletonRepository.setServiceActorControlCenter(new ServiceActorControlCenter()); 260 261 WorldModeler world = singletonRepository.getWorldModeler(); 263 WorldModelerLoader.init(world); 264 myName = world.getMyAgentName(); 265 266 dynamicRepository = new DynamicRepository(); 267 dynamicRepository.init(); 268 QueueServiceFactory queueServiceFactory = 269 (QueueServiceFactory) dynamicRepository.getImplementation("queueFactory"); 270 singletonRepository.setQueueServiceFactory(queueServiceFactory); 271 272 MantaJMXManagement jmxManager =MantaJMXManagement.getInstance(); 274 singletonRepository.setMantaJMXManagement(jmxManager); 275 jmxManager.startConnections(); 276 singletonRepository.setMantaJMXManagement(jmxManager); 277 jmxManager.startConnections(); 278 279 WorldModelerLoader.load(world); 282 283 singletonRepository.setWBManager(new WBManager()); 284 285 singletonRepository.setPluginManager(new PluginManager()); 287 288 log=LogFactory.getLog("MantaAgent"); 289 MantaRMIServer.init(); 291 292 MantaJMXManagement.registerJMXBeans(); 294 Runtime.getRuntime().addShutdownHook(new ServiceRecallShutdownHook()); 295 296 networkmanager.startServers(); 297 298 Thread.sleep(configManager.getIntProperty("plug-ins.auto-discovery.init_discovery_delay",1000)); 300 301 log=LogFactory.getLog("MantaAgent"); 302 303 if (StartupLogger.log.hasFatals()) { 304 System.out.println("*** MANTARAY LOADED WITH FATAL ERRORS!!! ***"); 305 log.fatal("MANTARAY LOADED WITH FATAL ERRORS AND WILL NOT BE ABLE TO FUNCTION PROPERLY!!!"); 306 log.fatal("PLEASE REFER TO THE MANTARAY LOG FOR FURTHER INFORMATION."); 307 } else if (StartupLogger.log.hasErrors()) { 308 System.out.println("*** MANTARAY LOADED WITH ERRORS!! ***"); 309 log.error("MANTARAY LOADED WITH ERRORS!! PROBABLY MANTARAY WILL NOT BE ABLE TO FUNCTION PROPERLY!!"); 310 log.error("PLEASE REFER TO THE MANTARAY LOG FOR FURTHER INFORMATION."); 311 } else if (StartupLogger.log.hasWarnings()) { 312 System.out.println("*** MANTARAY LOADED WITH WARNINGS! ***"); 313 log.warn("MANTARAY LOADED WITH WARNINGS! MANTARAY MIGHT NOT BE ABLE TO FUNCTION PROPERLY!"); 314 log.warn("PLEASE REFER TO THE MANTARAY LOG FOR FURTHER INFORMATION."); 315 } else { 316 System.out.println(Version.version+" initialization completed."); 317 log.info(Version.version+" initialization completed."); 318 log.info("MANTARAY LOADED (Don't Panic)."); 319 } 320 } 321 catch (Exception e1) { 322 e1.printStackTrace(); 323 started = true; 324 return false; 325 } 326 started = true; 327 return true; 328 } 329 330 331 337 private boolean FileOrFolderExists(String fileOrFolderName) { 338 File f = new File (fileOrFolderName); 339 if (!f.exists()) { 340 StartupLogger.log.info(fileOrFolderName + " NOT found in your file system!", "MantaAgent"); 342 return false; 343 } else { 344 StartupLogger.log.info(fileOrFolderName + " found.", "MantaAgent"); 346 return true; 347 } 348 349 } 350 351 355 372 public void enqueueMessage(MantaBusMessage message, ServiceProducer producer, byte deliveryMode, byte priority, long timeToLive) throws MantaException { 373 String queueName = producer.getServiceName(); 374 long now = SystemTime.currentTimeMillis(); 375 VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager(); 376 MantaService service = queuesManager.getQueueService(queueName); 377 if (service==null) { 378 String msg = "Queue was not found. Queue="+queueName+"."; 379 if (log.isDebugEnabled()) { 380 log.debug(msg); 381 } 382 throw new MantaException(msg, MantaException.ID_INVALID_ARGUMENTS); 383 } 384 QueueMaster master = queuesManager.getQueueMaster(queueName); 385 if( master ==null) 386 try { 387 if(log.isDebugEnabled()){ 388 log.debug("Waiting for queue coordinator to be created. Queue="+queueName+"."); 389 } long timeToWait = VirtualQueuesManager.getEnqueueWaitforCoordinator(); 391 if(timeToWait ==-1){ 392 timeToWait = timeToLive - now; 394 } 395 queuesManager.waitForQueueMaster(queueName,timeToWait); 396 master = queuesManager.getQueueMaster(queueName); 397 } catch (InterruptedException e1) { 398 throw new MantaException("An error occured while waiting for queue coordinator to be created. Queue="+queueName+". "+e1.getMessage(), MantaException.ID_RECEIVE_GENERAL_FAIL); 399 400 } 401 if( master ==null) { 402 String msg = "Queue coordinator was not found. Queue="+queueName+"."; 403 if (log.isDebugEnabled()) { 404 log.debug(msg); 405 } 406 throw new MantaException(msg, MantaException.ID_INVALID_ARGUMENTS); 407 } 408 409 if (log.isDebugEnabled()) { 410 log.debug("Found queue coordinator for queue. Queue="+queueName+", Coordinator peer="+master.getAgentName()+", Coordinator ID="+master.getId()); 411 } 412 413 message.setPriority(priority); 414 415 message.setDeliveryMode(deliveryMode); 416 message.setValidUntil(timeToLive); 417 418 message.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, queueName); 419 420 if (message.getSource() == null) { 421 MantaAddress add = producer; 422 message.setSource(add); 423 } 424 425 MantaBusMessage controlMsg = MantaBusMessage.getInstance(); 429 430 controlMsg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL); 431 432 ControlSignal control = new ControlSignal(ControlSignal.OPERATION_TYPE_ENQUEUE); 434 control.getParams().put(ControlSignal.ENQUEUED_MESSAGE ,message); 435 controlMsg.setPayload(control); 436 BlockingMessageListener blocker = new BlockingMessageListener(controlMsg); 438 blocker.setListenerString(queueName + control.getControlId()); 440 subscribeMessageListener(blocker, blocker.getListenerString()); 441 442 controlMsg.setRecipient(master); 443 long ctrlTTL = MantaAgentConstants.CONTROL_MESSAGES_TTL + now; 445 if(timeToLive < ctrlTTL){ 446 timeToLive = ctrlTTL; 447 } 448 449 controlMsg.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, queueName); 450 if (log.isDebugEnabled()) { 451 log.debug("Sending control message to queue coordinator. Control Message ID="+controlMsg.getMessageId()+", Coordinator ID="+master.getId()); 452 } 453 454 send(controlMsg, producer, MantaAgentConstants.NON_PERSISTENT, MantaAgentConstants.HIGH, timeToLive); 455 MantaBusMessage result; 456 try { 457 result = blocker.waitForResponse(timeToLive - now); 458 } catch (InterruptedException e) { 459 if (log.isDebugEnabled()) { 460 log.debug("An error occured while waiting for coordinator response to control message. Control Message ID="+controlMsg.getMessageId()+", Coordinator ID="+master.getId(), e); 461 } 462 throw new MantaException("Error while sending message to queue. Queue="+queueName, MantaException.ID_RECEIVE_GENERAL_FAIL); 463 } 464 unsubscribeMessageListener(blocker, blocker.getListenerString()); 465 if(queuesManager.isTempQueue(queueName) && !queuesManager.amIQueueMaster(queueName)) { 466 singletonRepository.getPostOffice().closeBox(master); 467 queuesManager.closeQueue(queueName); 468 } 469 470 if(result == null){ 471 if (log.isDebugEnabled()) { 472 log.debug("Queue coordinator did not respond to control message. Control Message ID="+controlMsg.getMessageId()+", Coordinator ID="+master.getId()); 473 } 474 throw new MantaException("Error while sending message to queue, Queue="+queueName, MantaException.ID_RECEIVE_GENERAL_FAIL); 475 } 476 byte enqRes = Byte.parseByte(result.getHeader(MantaBusMessageConsts.ENQUEUE_STATUS)); 478 if (enqRes == VirtualQueuesManager.ENQUEUE_FAIL) { 479 int queueStrategy = Integer.parseInt(getSingletonRepository().getConfigManager().getStringProperty("jms.queue_overflow_strategy")); 480 String overFlawMsg = "Queue overflow. Queue name=" + queueName + ", message ID " + message.getMessageId(); 481 if (queueStrategy == 482 AbstractQueueService.THROW_EXCEPTION_STRATERGY) { 483 throw new IllegalStateException (overFlawMsg); 484 } else if (queueStrategy == 485 AbstractQueueService. 486 RETURN_WITHOUT_ENQUEUE_STRATERGY) { 487 if (log.isWarnEnabled()) { 488 log.warn(overFlawMsg + ". Message droped."); 489 } 490 } 491 } 492 if (log.isDebugEnabled()) { 493 log.debug("Queue coordinator responded to control message. Control Message ID="+controlMsg.getMessageId()+", Coordinator ID="+master.getId()); 494 } 495 } 497 512 public MantaBusMessage receive(ServiceConsumer consumer) throws MantaException { 513 return receive(consumer, Long.MAX_VALUE); 514 } 515 516 517 534 public MantaBusMessage receive(ServiceConsumer consumer, long timeout) throws MantaException { 535 String queueName = consumer.getServiceName(); 536 WorldModeler world = singletonRepository.getWorldModeler(); 537 538 if (timeout < 0) { 539 timeout = Long.MAX_VALUE; 540 } 541 542 MantaBusMessage result = null; 543 VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager(); 544 545 BlockingMessageListener blocker = registerToQueue(consumer, 1); 547 if (blocker == null ) { 550 try { 551 queuesManager.waitForQueueMaster(queueName, timeout); 552 } 553 catch (InterruptedException e1) { 554 throw new MantaException("This should not happen InterruptedException on service waitForProducerChange" + e1.toString(), MantaException.ID_RECEIVE_GENERAL_FAIL); 555 } 556 blocker = registerToQueue(consumer, 1); 557 } 558 if (blocker == null) return null; 560 try { 562 result = blocker.waitForResponse(timeout); 563 } 564 catch (InterruptedException e) { 565 if(log.isErrorEnabled()){ 566 log.error("Got exception while waiting on receive. ", e); 567 } 568 } 569 570 unsubscribeMessageListener(blocker, blocker.getListenerString()); 571 queuesManager.getSubscriberManager(queueName).removeSubscribeToQueue(consumer); 572 if (result == null) { 575 unregisterFromQueue(consumer , blocker); 576 } 578 579 return result; 580 581 } 582 583 584 597 public Enumeration peekAtQueue(ServiceConsumer consumer) throws MantaException { 598 String queueName = consumer.getServiceName(); 599 VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager(); 600 MantaService service = queuesManager.getQueueService(queueName); 601 if (service == null || consumer.getServiceType() != MantaService.SERVICE_TYPE_QUEUE) { 602 throw new MantaException("No such Service " + queueName, 603 MantaException.ID_INVALID_ARGUMENTS); 604 } 605 QueueMaster master= queuesManager.getQueueMaster(queueName); 607 if(master != null && !singletonRepository.getNetworkManager().isAccessible(master.getAgentName())){ 608 log.error("Queue coordinator not accessible, this might me configuration problem , coordinator="+master.getAgentName()); 609 throw new MantaException("Queue coordinator not accessible " + queueName, MantaException.ID_INVALID_ARGUMENTS); 610 } 611 return new RemoteQueueEnumeration(consumer); 612 } 613 614 615 631 public MantaBusMessage receiveNoWait(ServiceConsumer consumer) throws MantaException { 632 String queueName = consumer.getServiceName(); 633 VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager(); 634 MantaService service = queuesManager.getQueueService(queueName); 635 if (service == null) { 636 throw new MantaException("No such Service " + queueName, MantaException.ID_INVALID_ARGUMENTS); 637 } 638 QueueMaster master = queuesManager.getQueueMaster(queueName); 639 if( master ==null || master.getValidUntil()< SystemTime.currentTimeMillis()){ 641 return null; 642 } 643 MantaBusMessage msg = null; 645 BlockingMessageListener blocker = registerToQueue(consumer, 0); 646 if (blocker == null) return null; 648 649 try { 650 msg = blocker.waitForResponse(10000); 652 if (msg != null) { 653 if (msg.getHeader(MantaBusMessageConsts.HEADER_NAME_IS_EMPTY) != null) { 654 msg = null; 656 } 657 } 658 } 659 catch (InterruptedException e) { 660 if(log.isErrorEnabled()){ 661 log.error("Got an exception while waiting on receiveNoWait. ", e); 662 } 663 664 } 665 unsubscribeMessageListener(blocker, blocker.getListenerString()); 666 queuesManager.getSubscriberManager(consumer.getServiceName()).removeSubscribeToQueue(consumer); 667 668 return msg; 669 } 671 672 680 private BlockingMessageListener registerToQueue(ServiceConsumer consumer, long numberOfReceive) throws MantaException { 681 String queueName = consumer.getServiceName(); 682 VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager(); 683 MantaService service = queuesManager.getQueueService(queueName); 684 if (service == null || service.getServiceType() != MantaService.SERVICE_TYPE_QUEUE) { 685 throw new MantaException("No such queue Service " + queueName, MantaException.ID_INVALID_ARGUMENTS); 686 } 687 688 BlockingMessageListener blocker = new BlockingMessageListener(); 689 queuesManager.getSubscriberManager(consumer.getServiceName()) 690 .subscribeToQueue(consumer, blocker, numberOfReceive); 691 692 return blocker; 693 } 695 696 702 private void unregisterFromQueue(ServiceConsumer consumer ,IMessageListener listener ) throws MantaException { 703 String queueName = consumer.getServiceName(); 704 VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager(); 705 MantaService service = queuesManager.getQueueService(queueName); 706 if (service == null || service.getServiceType() != MantaService.SERVICE_TYPE_QUEUE) { 707 throw new MantaException("No such queue Service " + queueName, 708 MantaException.ID_INVALID_ARGUMENTS); 709 } 710 711 queuesManager.getSubscriberManager(consumer.getServiceName()) 712 .unregisterFromQueue(consumer,listener); 713 714 } 715 716 731 public ArrayList CopyQueueContent(ServiceConsumer consumer) throws MantaException { 732 String queueName = consumer.getServiceName(); 733 VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager(); 734 MantaService service = queuesManager.getQueueService(queueName); 735 if (service == null) { 736 throw new MantaException("No such Service " + queueName, MantaException.ID_INVALID_ARGUMENTS); 737 } 738 ArrayList result = null; 739 QueueMaster master =queuesManager.getQueueMaster(queueName); 741 if(master == null) 743 return new ArrayList (); 744 745 MantaBusMessage msg = MantaBusMessage.getInstance(); 747 BlockingMessageListener listener = new BlockingMessageListener(msg); 748 listener.setListenerString(queueName + consumer.getId()); 749 subscribeMessageListener(listener, listener.getListenerString()); 750 msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL); 751 ControlSignal control = new ControlSignal(ControlSignal.OPERATION_TYPE_GET_QUEUE_COPY); 753 msg.setPayload(control); 754 755 msg.setRecipient(master); 756 757 msg.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, queueName); 758 759 send(msg, consumer, MantaAgentConstants.NON_PERSISTENT, MantaAgentConstants.HIGH, MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis()); 760 761 try { 762 result = (ArrayList ) listener.waitForResponse(Long.MAX_VALUE).getPayload(); 763 } 764 catch (Exception e) { 765 if(log.isErrorEnabled()){ 766 log.error("Error in copying remote queue, return with empty ArrayList.", e); 767 } 768 769 } 770 if (result == null) { 771 result = new ArrayList (); 772 } 773 774 unsubscribeMessageListener(listener, listener.getListenerString()); 775 return result; 776 777 } 779 780 794 public void subscribeToQueue(ServiceConsumer consumer ,IMessageListener listener) throws MantaException { 795 String queueName = consumer.getServiceName(); 796 VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager(); 797 MantaService service = queuesManager.getQueueService(queueName); 798 if (service == null || service.getServiceType() != MantaService.SERVICE_TYPE_QUEUE) { 799 throw new MantaException("No such queue Service " + queueName, MantaException.ID_INVALID_ARGUMENTS); 800 } 801 802 queuesManager.getSubscriberManager(consumer.getServiceName()) 804 .subscribeToQueue(consumer , listener, Long.MAX_VALUE); 805 806 } 808 809 818 819 public void unsubscribeFromQueue(ServiceConsumer consumer , IMessageListener listener ) throws MantaException { 820 this.unregisterFromQueue(consumer,listener ); 821 } 822 823 824 842 public void publish(MantaBusMessage message, ServiceProducer producer) throws MantaException { 843 publish(message, producer, message.getDeliveryMode(), message.getPriority(), message.getValidUntil()); 844 } 845 846 861 public void publish(MantaBusMessage message, ServiceProducer producer, byte deliveryMode, byte priority, long expiration) throws MantaException { 862 String topic = producer.getServiceName(); 863 VirtualTopicManager topicManager = singletonRepository.getVirtualTopicManager(); 864 865 if (!topicManager.hasTopic(topic)) { 866 throw new MantaException("No such active topic Service " + topic, MantaException.ID_INVALID_ARGUMENTS); 867 } 868 try { 869 topicManager.publish(topic ,message, producer, deliveryMode, priority, expiration); 870 } catch (IOException e) { 871 throw new MantaException("failed to publish "+e.getLocalizedMessage(),MantaException.ID_INVALID_ARGUMENTS ); 872 } 873 874 } 876 885 public void subscribeToTopic(IMessageListener listener, String topic) throws MantaException { 886 VirtualTopicManager topicManager = singletonRepository.getVirtualTopicManager(); 887 if (!topicManager.hasTopic(topic)) { 888 throw new MantaException("No such active topic Service " + topic, MantaException.ID_INVALID_ARGUMENTS); 889 } 890 subscribeMessageListener(listener, topic); 891 } 892 893 894 901 public void unsubscribeFromTopic(IMessageListener listener, String topic) { 902 unsubscribeMessageListener(listener, topic); 903 } 904 905 919 public void subscribeMessageListener(IMessageListener listener, String destination) { 920 singletonRepository.getIncomingClientMessageRouter().addIncommingClientMessageListener(destination, listener); 921 } 922 923 924 932 public void unsubscribeMessageListener(IMessageListener listener, String destination) { 933 singletonRepository.getIncomingClientMessageRouter().removeIncomingClientMessageListener(destination, listener); 934 } 935 936 953 public void send(MantaBusMessage message, MantaAddress sender) { 954 if (message.getSource() == null) { 955 message.setSource(sender); 956 } 957 958 MessageManipulator mm =singletonRepository.getMessageManipulator(); 959 if(mm!= null){ 960 message = mm.manipulate(message, null, MessageManipulator.OUTGOING); 961 } 962 getSingletonRepository().getPostOffice().SendMessage(message); 964 } 965 966 967 982 public void send(MantaBusMessage message, MantaAddress sender, byte deliveryMode, byte priority, long expiration) { 983 984 message.setPriority(priority); 985 986 message.setDeliveryMode(deliveryMode); 987 988 if(message.getValidUntil() < 1){ 989 message.setValidUntil(expiration); 991 992 } 993 994 send(message, sender); 995 996 } 997 998 1002 1013 public void ack(MantaBusMessage messageToAckToo) { 1014 if(messageToAckToo !=null && messageToAckToo.getRecipient() != null && messageToAckToo.getSource() != null){ 1015 MantaBusMessage reply = MantaBusMessageUtil.createACKMessage(messageToAckToo); 1016 1017 String messageId = messageToAckToo.getMessageId(); 1019 reply.addHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE, messageId); 1020 reply.removeHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION); 1021 StartupLogger.log.debug("Sending ACK: Message ID="+ 1029 reply.getMessageId()+ 1030 ", responding to message "+ 1031 messageId,"MantaAgent"); 1032 send(reply, 1033 messageToAckToo.getRecipient(), 1034 MantaAgentConstants.NON_PERSISTENT, 1035 MantaAgentConstants.HIGH, 1036 MantaAgentConstants.ACK_TTL+SystemTime.gmtCurrentTimeMillis()); 1037 } 1038 1039 } 1040 1041 1049 public void ackReject(MantaBusMessage messageToAckTo) { 1050 if(messageToAckTo !=null && messageToAckTo.getRecipient() != null && 1051 messageToAckTo.getSource() != null) { 1052 MantaBusMessage reply = 1053 MantaBusMessageUtil.createACKMessage(messageToAckTo); 1054 1055 String messageId = messageToAckTo.getMessageId(); 1057 reply.addHeader(MantaBusMessageConsts.HEADER_NAME_ACK_REJECT_RESPONSE_REFERENCE, messageId); 1058 reply.removeHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION); 1059 if (log.isDebugEnabled()) { 1060 log.debug("Sending Ack Reject: Message ID=" + 1061 reply.getMessageId() + ", responding to message " + 1062 messageId); 1063 } 1064 send(reply, 1065 messageToAckTo.getRecipient(), 1066 MantaAgentConstants.NON_PERSISTENT, 1067 MantaAgentConstants.HIGH, 1068 MantaAgentConstants.ACK_TTL + 1069 SystemTime.gmtCurrentTimeMillis()); 1070 } 1071 } 1072 1073 1074 1077 public void gotAck(String ackedMessageId, MantaAddress source) { 1078 MantaBusMessage msg = singletonRepository.getPostOffice().gotAck(ackedMessageId ,source ); 1079 if (msg == null) return; 1080 1081 singletonRepository.getDeliveryAckNotifier().gotAck(msg , source); 1082 1083 } 1085 1088 public void gotAckReject(String ackedMessageId, MantaAddress source) { 1089 MantaBusMessage msg = 1092 singletonRepository.getPostOffice().gotAckReject(ackedMessageId, source); 1093 if (msg != null) { 1094 singletonRepository.getDeliveryAckNotifier().gotAckReject(msg, 1095 source); 1096 } 1097 } 1099 1102 public void setAckListener(DeliveryAckListener ackListener) { 1103 if(singletonRepository.getDeliveryAckNotifier() == null){ 1104 singletonRepository.setDeliveryAckNotifier(new DeliveryAckNotifier()); 1105 } 1106 singletonRepository.getDeliveryAckNotifier().setGlobalListener(ackListener); 1107 } 1108 1109 1113 1132 public void advertiseService(ServiceActor serviceActor) throws MantaException { 1133 1134 WorldModeler world = singletonRepository.getWorldModeler(); 1135 1136 if (serviceActor.getServiceType() == MantaService.SERVICE_TYPE_QUEUE) { 1138 VirtualQueuesManager vqm = singletonRepository.getVirtualQueuesManager(); 1140 MantaService queue = vqm.getQueueService(serviceActor.getServiceName()) ; 1141 if (queue == null) { 1142 throw new MantaException("No such Queue " + serviceActor.getServiceName(), MantaException.ID_INVALID_ARGUMENTS); 1143 } 1144 1145 1146 if(serviceActor.getType() == ServiceActor.COORDINATOR){ 1147 world.addCoordinatedService(queue); 1148 QueueMaster coordinator = (QueueMaster) serviceActor; 1149 coordinator.setValidUntil(Long.MAX_VALUE); 1150 vqm.setQueueMaster(serviceActor.getServiceName(), (QueueMaster) serviceActor); 1151 if (!serviceActor.getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) { 1156 singletonRepository.getServiceActorControlCenter().advertiseService(serviceActor, this); 1157 } 1158 vqm.active(serviceActor.getServiceName()); 1160 return; 1161 } else if (serviceActor.getType() == ServiceActor.PRODUCER) { 1162 world.addProducedService(queue); 1163 queue.addProducer((ServiceProducer) serviceActor); 1164 } else if (serviceActor.getType() == ServiceActor.CONSUMER) { 1165 world.addConsumedServices(queue); 1166 queue.addConsumer((ServiceConsumer) serviceActor); 1167 } 1168 if(vqm.isTempQueue(queue.getServiceName())){ 1169 return; 1171 } 1172 1173 }else{ 1174 VirtualTopicManager vtm = getSingletonRepository().getVirtualTopicManager(); 1176 1177 if (serviceActor.getType() == ServiceActor.PRODUCER) { 1178 MantaService topic =(MantaService)getService( serviceActor.getServiceName(), MantaService.SERVICE_TYPE_TOPIC); 1179 if (topic == null) { 1180 throw new MantaException("No such Topic " + serviceActor.getServiceName(), MantaException.ID_INVALID_ARGUMENTS); 1181 } 1182 1183 singletonRepository.getWorldModeler().addProducedService(topic); 1184 topic.addProducer((ServiceProducer) serviceActor); 1185 } else if (serviceActor.getType() == ServiceActor.CONSUMER) { 1186 singletonRepository.getVirtualTopicManager().addConsumer((ServiceConsumer)serviceActor); 1187 } 1188 } 1189 if (serviceActor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC || !serviceActor.getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) { 1190 singletonRepository.getServiceActorControlCenter().advertiseService(serviceActor, this); 1192 } 1193 } 1195 1205 public void recallService(ServiceActor serviceActor) throws MantaException { 1206 1207 WorldModeler world = singletonRepository.getWorldModeler(); 1208 String serviceName = serviceActor.getServiceName(); 1209 if (serviceActor.getType() == ServiceActor.PRODUCER) { 1210 1211 MantaService service = getService( serviceName,serviceActor.getServiceType()); 1212 if (service == null) { 1213 throw new MantaException("No such Service " + serviceName, MantaException.ID_INVALID_ARGUMENTS); 1214 } 1215 1216 service.removeProducer((ServiceProducer) serviceActor); 1217 1218 if (service.getProducersByAgentId(getAgentName()).isEmpty()) { 1219 world.removeProducedService(service); 1220 } 1221 } else if (serviceActor.getType() == ServiceActor.CONSUMER) { 1222 if(serviceActor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC){ 1223 singletonRepository.getVirtualTopicManager().removeConsumer((ServiceConsumer) serviceActor); 1224 }else{ 1225 MantaService service = getService( serviceName,serviceActor.getServiceType()); 1226 if (service == null) { 1227 throw new MantaException("No such Service " + serviceName, MantaException.ID_INVALID_ARGUMENTS); 1228 } 1229 1230 service.removeConsumer((ServiceConsumer) serviceActor); 1231 1232 ServiceConsumer consumer = (ServiceConsumer) serviceActor; 1234 singletonRepository.getVirtualQueuesManager().getSubscriberManager(serviceName).removeSubscribeToQueue(consumer); 1235 1236 if (service.getConsumersByAgentId(getAgentName()).isEmpty()) { 1237 world.removeConsumedServices(service); 1238 } 1239 } 1240 1241 }else if (serviceActor.getType() == ServiceActor.COORDINATOR){ 1242 MantaService service = getService( serviceName,serviceActor.getServiceType()); 1243 if (service == null) { 1244 throw new MantaException("No such Service " + serviceName, MantaException.ID_INVALID_ARGUMENTS); 1245 } 1246 1247 if(singletonRepository.getVirtualQueuesManager().amIQueueMaster(serviceName)==false){ 1248 return; 1250 } 1251 getSingletonRepository().getPostOffice().handleCoordinatorDown((QueueMaster)serviceActor, null); 1252 world.removeCoordinatedService(service); 1253 } 1254 if (!serviceActor.getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) { 1255 singletonRepository.getServiceActorControlCenter().recallService(serviceActor, this); 1257 } 1258 } 1260 1269 public void recallDurableSubscription(ServiceActor durableSubscription) throws MantaException { 1270 1271 WorldModeler world = singletonRepository.getWorldModeler(); 1272 MantaService service =(MantaService) getService( durableSubscription.getServiceName(),durableSubscription.getServiceType()); 1273 if (service == null) { 1274 throw new MantaException("No such Service " + durableSubscription.getServiceName(), MantaException.ID_INVALID_ARGUMENTS); 1275 } 1276 1277 singletonRepository.getVirtualTopicManager().removeDurableConsumer(service.getServiceName(),(ServiceConsumer) durableSubscription); 1278 1279 if (service.getConsumersByAgentId(getAgentName()).isEmpty()) { 1280 world.removeConsumedServices(service); 1281 } 1282 1283 singletonRepository.getServiceActorControlCenter().recallDurableSubscription(durableSubscription, this); 1284 1285 1286 } 1288 1293 public MantaBusMessage getMantaBusMessage() { 1294 return MantaBusMessage.getInstance(); 1295 } 1296 1297 1298 1308 public MantaService getService(String name, byte serviceType) { 1309 MantaService result = null; 1310 if(serviceType == MantaService.SERVICE_TYPE_QUEUE){ 1311 result = singletonRepository.getVirtualQueuesManager().getQueueService(name); 1312 }else{ 1313 result = singletonRepository.getVirtualTopicManager().getTopicService(name); 1314 } 1315 return result; 1316 } 1317 1318 1326 public boolean containsService(String service){ 1327 return singletonRepository.getWorldModeler().containsService(singletonRepository.getWorldModeler().getDefaultDomainName(), service ); 1328 } 1329 1330 1331 1337 public SingletonRepository getSingletonRepository() { 1338 return singletonRepository; 1339 } 1341 1344 public String getMessageId() { 1345 return String.valueOf(UniqueIDGenerator.getNextMessageID()); 1346 } 1347 1348 1349 1354 public String getAgentName() { 1355 1356 return myName; 1357 } 1358 1359 1360 1365 public String getDomainName() { 1366 return singletonRepository.getWorldModeler().getDefaultDomainName(); 1367 } 1368 1369 1370 protected DynamicRepository getDynamicRepository() { 1371 return dynamicRepository; 1372 } 1373 1374 1380 public static void setConfiguration(Element element){ 1381 configurationElement = element; 1382 } 1383 1384 public static boolean isStarted() { 1385 return started; 1386 } 1387} | Popular Tags |