KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > MantaAgent


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat .
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46
47
48 package org.mr;
49
50
51 import java.io.File JavaDoc;
52 import java.io.IOException JavaDoc;
53 import java.util.ArrayList JavaDoc;
54 import java.util.Enumeration JavaDoc;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58 import org.mr.api.jms.MantaConnection;
59 import org.mr.api.rmi.MantaRMIServer;
60 import org.mr.core.cmc.MantaJMXManagement;
61 import org.mr.core.configuration.ConfigManager;
62 import org.mr.core.groups.MutlicastGroupManager;
63 import org.mr.core.log.StartupLogger;
64 import org.mr.core.net.MantaAddress;
65 import org.mr.core.net.NetworkManager;
66 import org.mr.core.persistent.PersistentConst;
67 import org.mr.core.protocol.MantaBusMessage;
68 import org.mr.core.protocol.MantaBusMessageConsts;
69 import org.mr.core.protocol.MantaBusMessageUtil;
70 import org.mr.core.protocol.MessageTransformer;
71 import org.mr.core.stats.StatManager;
72 import org.mr.core.util.SystemTime;
73 import org.mr.core.util.byteable.ByteBufferPool;
74 import org.mr.core.util.byteable.ByteableRegistry;
75 import org.mr.indexing.WBManager;
76 import org.mr.kernel.BlockingMessageListener;
77 import org.mr.kernel.DelayedMessageSender;
78 import org.mr.kernel.IncomingClientMessageRouter;
79 import org.mr.kernel.IncomingMessageManager;
80 import org.mr.kernel.PluginManager;
81 import org.mr.kernel.UniqueIDGenerator;
82 import org.mr.kernel.security.SecurityInitializer;
83 import org.mr.kernel.control.ControlSignal;
84 import org.mr.kernel.control.ControlSignalMessageConsumer;
85 import org.mr.kernel.delivery.DeliveryAckListener;
86 import org.mr.kernel.delivery.DeliveryAckNotifier;
87 import org.mr.kernel.delivery.PostOffice;
88 import org.mr.kernel.services.MantaService;
89 import org.mr.kernel.services.SelectorsManager;
90 import org.mr.kernel.services.ServiceActor;
91 import org.mr.kernel.services.ServiceActorControlCenter;
92 import org.mr.kernel.services.ServiceConsumer;
93 import org.mr.kernel.services.ServiceProducer;
94 import org.mr.kernel.services.ServiceRecallShutdownHook;
95 import org.mr.kernel.services.queues.*;
96 import org.mr.kernel.services.topics.VirtualTopicManager;
97 import org.mr.kernel.world.WorldModeler;
98 import org.mr.kernel.world.WorldModelerLoader;
99 import org.w3c.dom.Element JavaDoc;
100
101
102 /**
103  * This class represents the point of entry to the manta layer. <br>
104  * It holds and expose reference to all the classes needed for working with the
105  * manta layer. <br>
106  * The class is a singleton and thread safe for use. <br>
107  * <b>Note </b> that system property 'mantaConfig' must be set to the path
108  * where the manta configuration file is. To do this use the
109  * System.setProperty("mantaConfig", [i.e c:/manta/config.xml]) or by the -D parameter
110  * in the java command line see examples for proper use.
111  *
112  * @author Amir Shevat
113  * @version 1.0
114  * @see MantaAgentConstants
115  * @since Dec 29, 2003
116  */

117 public class MantaAgent {
118
119     private SingletonRepository singletonRepository = null;
120     private DynamicRepository dynamicRepository;
121     private static MantaAgent instance = null;
122     private static Log log;
123
124
125     private String JavaDoc mantaConfigurationFile;
126
127     /**
128      * true if manta agent has been instanced else false
129      */

130     public static boolean started = false;
131     private String JavaDoc myName;
132     /**
133      * Added by Shirley Sasson - 07/06/06
134      * Holds the configuration as a DOM element. Will be not null onlt if configuration was set from an
135      * outside application using MantaConnectionFactory.setConfiguration method.
136      */

137     private static Element JavaDoc configurationElement;
138
139     /**
140      * This is the singleton entry point calling this method for the <u>first
141      * </u> time inits the agent and should be longer the the rest of the method
142      * called.
143      *
144      * @return the singleton instance of the manta layer API
145      */

146     public static MantaAgent getInstance() {
147         if (instance == null) {
148             synchronized(MantaAgent.class){
149                 if (instance == null)
150                     instance = new MantaAgent();
151             }
152         }
153
154         return instance;
155
156     }//getInstance
157

158     /**
159      * Inits all the manta layer. Constractor for the MantaAgent.
160      */

161     private MantaAgent() {
162         StartupLogger.log.startStore();
163         try {
164             // Added by Shirley Sasson - 07/06/06
165
// If configuration was set by an outside application using MantaConnectionFactory.setConfiguration method,
166
// don't look for a VM paramter with the name mantaConfig.
167
if (configurationElement == null){
168                 mantaConfigurationFile = System.getProperty(MantaAgentConstants.MANTA_CONFIG);
169                 if (mantaConfigurationFile == null) {
170                     StartupLogger.log.fatal("MantaRay configuration not set! - Please set the system property 'mantaConfig' to the location of MantaRay configuration file, or use setConfiguration method to supply a DOM element for configuration.", "MantaAgent");
171                     StartupLogger.log.fatal("In order for manta to work properly either of the two configurations need to be set.", "MantaAgent");
172                     mantaConfigurationFile = "./default_config.xml";
173                 }
174                 StartupLogger.log.info("property 'mantaConfig'=" + mantaConfigurationFile, "MantaAgent");
175
176                 //validate that the params exist end notify about it
177
FileOrFolderExists(mantaConfigurationFile);
178             }
179         }
180         catch (Throwable JavaDoc t) {
181             t.printStackTrace();
182         }//catch
183

184     }//MantaAgent
185

186     /**
187      * starts the manta layer - mainly inits the singletons this method will
188      * run only once after the first run it will allways return true
189      *
190      * @return false if this is the first time this method was invoked and an
191      * error has happend, else true
192      */

193     public synchronized boolean init() {
194         if (started) return true;
195         try {
196             // Added by Shirley Sasson - 07/06/06
197
// Create a new ConfigManager object according to the configuration supplied (file or DOM element).
198
ConfigManager configManager = null;
199             if (configurationElement != null){
200                 configManager = new ConfigManager(configurationElement);
201                 // set reference to null cause not needed anymore and should be garbage collected
202
configurationElement = null;
203             } else
204                 configManager = new ConfigManager(mantaConfigurationFile);
205
206             this.singletonRepository = new SingletonRepository();
207             singletonRepository.setConfigManager(configManager);
208
209             singletonRepository.setVirtualTopicManager(new VirtualTopicManager());
210             SystemTime.init();
211
212             StatManager statManager = new StatManager();
213             singletonRepository.setStatManager(statManager);
214
215             // initialize the authentication and authorization implementations
216
SecurityInitializer.initialize();
217
218             // init network
219
NetworkManager networkmanager =new NetworkManager(WorldModeler.getInstance(), statManager);
220             networkmanager.createServerSockets();
221             singletonRepository.setNetworkManager(networkmanager);
222
223             String JavaDoc defaultPersistentDir = "./persistent";
224             String JavaDoc persistentDir = configManager.getStringProperty("persistency.file.persistent_folder", defaultPersistentDir);
225             if (persistentDir.trim().length() == 0)
226                 persistentDir = defaultPersistentDir;
227             PersistentConst.setPersistentDir(persistentDir);
228             int numOfSmallBuffersInPool = configManager.getIntProperty("small_buffer_pool_size", 100);
229             int numOfMediumBuffersInPool= configManager.getIntProperty("medium_buffer_pool_size", 50);
230             int numOfLargeBuffersInPool= configManager.getIntProperty("big_buffer_pool_size", 10);
231             if (configManager.getBooleanProperty("LazyMessageParsing", false)) {
232                 MantaBusMessage.setLazyParsing();
233             }
234             PersistentConst.setPersistentByteBufferPool(new ByteBufferPool(numOfSmallBuffersInPool, numOfMediumBuffersInPool, numOfLargeBuffersInPool));
235
236             MessageTransformer transformer = new MessageTransformer();
237
238 // init ByteableRegistry
239
ByteableRegistry.init();
240
241             //start jmx addaptors
242
// load jmx - added by lital kasif
243
singletonRepository.setDelayedMessageSender(new DelayedMessageSender());
244             singletonRepository.setDeliveryAckNotifier(new DeliveryAckNotifier());
245
246             singletonRepository.setPostOffice(new PostOffice(WorldModeler.getInstance()));
247
248             singletonRepository.setVirtualQueuesManager(new VirtualQueuesManager());
249
250             //singletonRepository.setVirtualTopicManager(new VirtualTopicManager());
251

252             singletonRepository.setWorldModeler(WorldModeler.getInstance());
253             singletonRepository.setControlSignalMessageConsumer(new ControlSignalMessageConsumer());
254             singletonRepository.setIncomingMessageManager(new IncomingMessageManager());
255             singletonRepository.setIncomingClientMessageRouter(new IncomingClientMessageRouter());
256             singletonRepository.setSelectorsManager(new SelectorsManager());
257             singletonRepository.setGroupsManager(new MutlicastGroupManager());
258 // singletonRepository.setControlSignalMessageSender(new ControlSignalMessageSender());
259
singletonRepository.setServiceActorControlCenter(new ServiceActorControlCenter());
260
261             //init the world
262
WorldModeler world = singletonRepository.getWorldModeler();
263             WorldModelerLoader.init(world);
264             myName = world.getMyAgentName();
265
266             dynamicRepository = new DynamicRepository();
267             dynamicRepository.init();
268             QueueServiceFactory queueServiceFactory =
269                 (QueueServiceFactory) dynamicRepository.getImplementation("queueFactory");
270             singletonRepository.setQueueServiceFactory(queueServiceFactory);
271
272 // load jmx - added by lital kasif
273
MantaJMXManagement jmxManager =MantaJMXManagement.getInstance();
274             singletonRepository.setMantaJMXManagement(jmxManager);
275             jmxManager.startConnections();
276             singletonRepository.setMantaJMXManagement(jmxManager);
277             jmxManager.startConnections();
278
279             // go on with the loader
280
// load loads all the other peers and services
281
WorldModelerLoader.load(world);
282
283             singletonRepository.setWBManager(new WBManager());
284
285             //The Plugin manager is used to initialize plugins.
286
singletonRepository.setPluginManager(new PluginManager());
287
288             log=LogFactory.getLog("MantaAgent");
289             // rmi API
290
MantaRMIServer.init();
291
292             // registers all the JMX managed objects
293
MantaJMXManagement.registerJMXBeans();
294             Runtime.getRuntime().addShutdownHook(new ServiceRecallShutdownHook());
295
296             networkmanager.startServers();
297
298             // nimo, 15JUN2005 - let manta know the world before allowing to publish.
299
Thread.sleep(configManager.getIntProperty("plug-ins.auto-discovery.init_discovery_delay",1000));
300
301             log=LogFactory.getLog("MantaAgent");
302
303             if (StartupLogger.log.hasFatals()) {
304                 System.out.println("*** MANTARAY LOADED WITH FATAL ERRORS!!! ***");
305                 log.fatal("MANTARAY LOADED WITH FATAL ERRORS AND WILL NOT BE ABLE TO FUNCTION PROPERLY!!!");
306                 log.fatal("PLEASE REFER TO THE MANTARAY LOG FOR FURTHER INFORMATION.");
307             } else if (StartupLogger.log.hasErrors()) {
308                 System.out.println("*** MANTARAY LOADED WITH ERRORS!! ***");
309                 log.error("MANTARAY LOADED WITH ERRORS!! PROBABLY MANTARAY WILL NOT BE ABLE TO FUNCTION PROPERLY!!");
310                 log.error("PLEASE REFER TO THE MANTARAY LOG FOR FURTHER INFORMATION.");
311             } else if (StartupLogger.log.hasWarnings()) {
312                 System.out.println("*** MANTARAY LOADED WITH WARNINGS! ***");
313                 log.warn("MANTARAY LOADED WITH WARNINGS! MANTARAY MIGHT NOT BE ABLE TO FUNCTION PROPERLY!");
314                 log.warn("PLEASE REFER TO THE MANTARAY LOG FOR FURTHER INFORMATION.");
315             } else {
316                 System.out.println(Version.version+" initialization completed.");
317                 log.info(Version.version+" initialization completed.");
318                 log.info("MANTARAY LOADED (Don't Panic).");
319             }
320         }
321         catch (Exception JavaDoc e1) {
322             e1.printStackTrace();
323             started = true;
324             return false;
325         }
326         started = true;
327         return true;
328     }
329
330
331     /**
332      * Returns true is file or folder exists else false
333      *
334      * @param fileOrFolderName the name of the file or the folder
335      * @return true is file or folder exists else false
336      */

337     private boolean FileOrFolderExists(String JavaDoc fileOrFolderName) {
338         File JavaDoc f = new File JavaDoc(fileOrFolderName);
339         if (!f.exists()) {
340             //System.out.println(fileOrFolderName + " NOT found in your file system!");
341
StartupLogger.log.info(fileOrFolderName + " NOT found in your file system!", "MantaAgent");
342             return false;
343         } else {
344             //System.out.println(fileOrFolderName + " found.");
345
StartupLogger.log.info(fileOrFolderName + " found.", "MantaAgent");
346             return true;
347         }
348
349     }
350
351     ////////////////////////////////////////
352
/// QUEUES
353
////////////////////////////////////////
354

355     /**
356      * Enqueue a manta bus message to a queue service , this method will not return untill the message is in the queue coordinator side <b>Note: you MUST
357      * advertise yourself as queue producer before invoking this method. </b>
358      *
359      * @param message the message to be enqueued
360      * @param producer hold the queue name and addressing info of the producer of
361      * this message.
362      * @param deliveryMode see MantaAgentConstants for types
363      * @param priority see MantaAgentConstants for types
364      * @param ackType see MantaAgentConstants for types
365      * @param timeToLive after this time this message should not be sent (millisec)
366      * @throws MantaException if no such active topic
367      * @throws MantaException if fail to send to server
368      * @see org.mr.kernel.services.ServiceProducer
369      * @see advertiseService(ServiceActor serviceActor)
370      * @see org.mr.core.protocol.MantaBusMessage
371      */

372     public void enqueueMessage(MantaBusMessage message, ServiceProducer producer, byte deliveryMode, byte priority, long timeToLive) throws MantaException {
373         String JavaDoc queueName = producer.getServiceName();
374         long now = SystemTime.currentTimeMillis();
375         VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager();
376         MantaService service = queuesManager.getQueueService(queueName);
377         if (service==null) {
378             String JavaDoc msg = "Queue was not found. Queue="+queueName+".";
379             if (log.isDebugEnabled()) {
380                 log.debug(msg);
381             }
382             throw new MantaException(msg, MantaException.ID_INVALID_ARGUMENTS);
383         }
384         QueueMaster master = queuesManager.getQueueMaster(queueName);
385         if( master ==null)
386             try {
387                 if(log.isDebugEnabled()){
388                     log.debug("Waiting for queue coordinator to be created. Queue="+queueName+".");
389                 }//if
390
long timeToWait = VirtualQueuesManager.getEnqueueWaitforCoordinator();
391                 if(timeToWait ==-1){
392                     //if timeout is -1 then take the ttl of the message
393
timeToWait = timeToLive - now;
394                 }
395                 queuesManager.waitForQueueMaster(queueName,timeToWait);
396                 master = queuesManager.getQueueMaster(queueName);
397             } catch (InterruptedException JavaDoc e1) {
398                 throw new MantaException("An error occured while waiting for queue coordinator to be created. Queue="+queueName+". "+e1.getMessage(), MantaException.ID_RECEIVE_GENERAL_FAIL);
399
400             }
401         if( master ==null) {
402             String JavaDoc msg = "Queue coordinator was not found. Queue="+queueName+".";
403             if (log.isDebugEnabled()) {
404                 log.debug(msg);
405             }
406             throw new MantaException(msg, MantaException.ID_INVALID_ARGUMENTS);
407         }
408
409         if (log.isDebugEnabled()) {
410             log.debug("Found queue coordinator for queue. Queue="+queueName+", Coordinator peer="+master.getAgentName()+", Coordinator ID="+master.getId());
411         }
412
413         message.setPriority(priority);
414
415         message.setDeliveryMode(deliveryMode);
416         message.setValidUntil(timeToLive);
417
418         message.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, queueName);
419
420         if (message.getSource() == null) {
421             MantaAddress add = producer;
422             message.setSource(add);
423         }
424
425         // difine the control message TTL
426
// now that we have the manta Message ready send it to the coordinator
427
// the message to be sent to the other side
428
MantaBusMessage controlMsg = MantaBusMessage.getInstance();
429
430         controlMsg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL);
431
432         // insert the control payload
433
ControlSignal control = new ControlSignal(ControlSignal.OPERATION_TYPE_ENQUEUE);
434         control.getParams().put(ControlSignal.ENQUEUED_MESSAGE ,message);
435         controlMsg.setPayload(control);
436         // we return this blocker for the calling to wait on
437
BlockingMessageListener blocker = new BlockingMessageListener(controlMsg);
438         //blocker.setListenerString(queueName+msg.getMessageId() );
439
blocker.setListenerString(queueName + control.getControlId());
440         subscribeMessageListener(blocker, blocker.getListenerString());
441
442         controlMsg.setRecipient(master);
443 // difine the control message TTL
444
long ctrlTTL = MantaAgentConstants.CONTROL_MESSAGES_TTL + now;
445         if(timeToLive < ctrlTTL){
446             timeToLive = ctrlTTL;
447         }
448
449         controlMsg.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, queueName);
450         if (log.isDebugEnabled()) {
451             log.debug("Sending control message to queue coordinator. Control Message ID="+controlMsg.getMessageId()+", Coordinator ID="+master.getId());
452         }
453
454         send(controlMsg, producer, MantaAgentConstants.NON_PERSISTENT, MantaAgentConstants.HIGH, timeToLive);
455         MantaBusMessage result;
456         try {
457             result = blocker.waitForResponse(timeToLive - now);
458         } catch (InterruptedException JavaDoc e) {
459             if (log.isDebugEnabled()) {
460                 log.debug("An error occured while waiting for coordinator response to control message. Control Message ID="+controlMsg.getMessageId()+", Coordinator ID="+master.getId(), e);
461             }
462             throw new MantaException("Error while sending message to queue. Queue="+queueName, MantaException.ID_RECEIVE_GENERAL_FAIL);
463         }
464         unsubscribeMessageListener(blocker, blocker.getListenerString());
465         if(queuesManager.isTempQueue(queueName) && !queuesManager.amIQueueMaster(queueName)) {
466             singletonRepository.getPostOffice().closeBox(master);
467             queuesManager.closeQueue(queueName);
468         }
469
470         if(result == null){
471             if (log.isDebugEnabled()) {
472                 log.debug("Queue coordinator did not respond to control message. Control Message ID="+controlMsg.getMessageId()+", Coordinator ID="+master.getId());
473             }
474             throw new MantaException("Error while sending message to queue, Queue="+queueName, MantaException.ID_RECEIVE_GENERAL_FAIL);
475         }
476         //Aviad added here throtoling mechanism
477
byte enqRes = Byte.parseByte(result.getHeader(MantaBusMessageConsts.ENQUEUE_STATUS));
478         if (enqRes == VirtualQueuesManager.ENQUEUE_FAIL) {
479             int queueStrategy = Integer.parseInt(getSingletonRepository().getConfigManager().getStringProperty("jms.queue_overflow_strategy"));
480             String JavaDoc overFlawMsg = "Queue overflow. Queue name=" + queueName + ", message ID " + message.getMessageId();
481             if (queueStrategy ==
482                     AbstractQueueService.THROW_EXCEPTION_STRATERGY) {
483                 throw new IllegalStateException JavaDoc(overFlawMsg);
484             } else if (queueStrategy ==
485                     AbstractQueueService.
486                             RETURN_WITHOUT_ENQUEUE_STRATERGY) {
487                 if (log.isWarnEnabled()) {
488                     log.warn(overFlawMsg + ". Message droped.");
489                 }
490             }
491         }
492         if (log.isDebugEnabled()) {
493             log.debug("Queue coordinator responded to control message. Control Message ID="+controlMsg.getMessageId()+", Coordinator ID="+master.getId());
494         }
495     }//enqueueMessage
496

497     /**
498      * Receives the next message on a queue, blocks until a message is dequeued
499      * from the queue and return to the invoker, the queue name is held in the
500      * ServiceConsumer object. <b>Note: 2 thread should not invoke this method
501      * with the same ServiceConsumer object </b> <b>Note: you MUST advertise
502      * yourself as queue consumer before invoking this method. </b>
503      *
504      * @param consumer holds the queue Name along with information to address the
505      * queue message back to the invoker.
506      * @return a manta bus message from the remote queue
507      * @throws MantaException if fail to receive for any reason
508      * @see org.mr.kernel.services.ServiceConsumer
509      * @see advertiseService(ServiceActor serviceActor)
510      * @see org.mr.core.protocol.MantaBusMessage
511      */

512     public MantaBusMessage receive(ServiceConsumer consumer) throws MantaException {
513         return receive(consumer, Long.MAX_VALUE);
514     }
515
516
517     /**
518      * Receives the next message on a queue, blocks until a message is dequeued
519      * from the queue and return to the invoker or until timeout has expired ,
520      * the queue name is held in the ServiceConsumer object. <b>Note: 2 thread
521      * should not invoke this method with the same ServiceConsumer object </b>
522      * <b>Note: you MUST advertise yourself as queue consumer before invoking
523      * this method. </b>
524      *
525      * @param consumer holds the queue Name along with information to address the
526      * queue message back to the invoker.
527      * @param timeout until when to wait for response
528      * @return a manta bus message from the remote queue
529      * @throws MantaException if fail to receive for any reason
530      * @see org.mr.kernel.services.ServiceConsumer
531      * @see advertiseService(ServiceActor serviceActor)
532      * @see org.mr.core.protocol.MantaBusMessage
533      */

534     public MantaBusMessage receive(ServiceConsumer consumer, long timeout) throws MantaException {
535         String JavaDoc queueName = consumer.getServiceName();
536         WorldModeler world = singletonRepository.getWorldModeler();
537
538         if (timeout < 0) {
539             timeout = Long.MAX_VALUE;
540         }
541
542         MantaBusMessage result = null;
543         VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager();
544
545         // add yourself to the queue receiver list
546
BlockingMessageListener blocker = registerToQueue(consumer, 1);
547         // if blocker is null there are no known master for this queue
548
// we need to wait for a master to advertise itself
549
if (blocker == null ) {
550             try {
551                 queuesManager.waitForQueueMaster(queueName, timeout);
552             }
553             catch (InterruptedException JavaDoc e1) {
554                 throw new MantaException("This should not happen InterruptedException on service waitForProducerChange" + e1.toString(), MantaException.ID_RECEIVE_GENERAL_FAIL);
555             }
556             blocker = registerToQueue(consumer, 1);
557         }
558         //if there is still no master for the queue return null
559
if (blocker == null) return null;
560         // here we wait for response from the master
561
try {
562             result = blocker.waitForResponse(timeout);
563         }
564         catch (InterruptedException JavaDoc e) {
565             if(log.isErrorEnabled()){
566                 log.error("Got exception while waiting on receive. ", e);
567             }
568         }
569
570         unsubscribeMessageListener(blocker, blocker.getListenerString());
571         queuesManager.getSubscriberManager(queueName).removeSubscribeToQueue(consumer);
572         // remove yourself from the queue receiver list
573
// this is done if we got timed out before we got a queue element
574
if (result == null) {
575             unregisterFromQueue(consumer , blocker);
576         }//if
577

578
579         return result;
580
581     }
582
583
584     /**
585      * Returns an Enumeration of the underline remote queue <b>Note: 2 thread
586      * should not invoke this method with the same ServiceConsumer object </b>
587      * <b>Note: you MUST advertise yourself as queue consumer before invoking
588      * this method. </b>
589      *
590      * @param consumer holds the queue Name along with information to address the
591      * queue message back to the invoker.
592      * @return an Enumeration of the underline remote queue
593      * @throws MantaException if service is null or not queue
594      * @see org.mr.kernel.services.ServiceConsumer
595      * @see advertiseService(ServiceActor serviceActor)
596      */

597     public Enumeration JavaDoc peekAtQueue(ServiceConsumer consumer) throws MantaException {
598         String JavaDoc queueName = consumer.getServiceName();
599         VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager();
600         MantaService service = queuesManager.getQueueService(queueName);
601         if (service == null || consumer.getServiceType() != MantaService.SERVICE_TYPE_QUEUE) {
602             throw new MantaException("No such Service " + queueName,
603                     MantaException.ID_INVALID_ARGUMENTS);
604         }
605        // check if the queue coordinator is accessible
606
QueueMaster master= queuesManager.getQueueMaster(queueName);
607         if(master != null && !singletonRepository.getNetworkManager().isAccessible(master.getAgentName())){
608             log.error("Queue coordinator not accessible, this might me configuration problem , coordinator="+master.getAgentName());
609             throw new MantaException("Queue coordinator not accessible " + queueName, MantaException.ID_INVALID_ARGUMENTS);
610         }
611         return new RemoteQueueEnumeration(consumer);
612     }
613
614
615     /**
616      * Receives the next message if one is immediately available. Else the
617      * client returns immediately. the queue name is held in the ServiceConsumer
618      * object. <b>Note: 2 thread should not invoke this method with the same
619      * ServiceConsumer object </b> <b>Note: you MUST advertise yourself as
620      * queue consumer before invoking this method. </b>
621      *
622      * @param consumer holds the queue Name along with information to address the
623      * queue message back to the invoker.
624      * @return MantaBusMessage the message that was dequeued from the queue or
625      * null if non dequeued
626      * @throws MantaException if fail to receive for any reason
627      * @see org.mr.kernel.services.ServiceConsumer
628      * @see advertiseService(ServiceActor serviceActor)
629      * @see org.mr.core.protocol.MantaBusMessage
630      */

631     public MantaBusMessage receiveNoWait(ServiceConsumer consumer) throws MantaException {
632         String JavaDoc queueName = consumer.getServiceName();
633         VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager();
634         MantaService service = queuesManager.getQueueService(queueName);
635         if (service == null) {
636             throw new MantaException("No such Service " + queueName, MantaException.ID_INVALID_ARGUMENTS);
637         }
638         QueueMaster master = queuesManager.getQueueMaster(queueName);
639         // if there is no coordinator then return empty handed
640
if( master ==null || master.getValidUntil()< SystemTime.currentTimeMillis()){
641             return null;
642         }
643         // we have producers go on
644
MantaBusMessage msg = null;
645         BlockingMessageListener blocker = registerToQueue(consumer, 0);
646         // if there are no coordinator for this queue return null
647
if (blocker == null) return null;
648
649         try {
650             // the response should come with null, we should not wait very long
651
msg = blocker.waitForResponse(10000);
652             if (msg != null) {
653                 if (msg.getHeader(MantaBusMessageConsts.HEADER_NAME_IS_EMPTY) != null) {
654                     // we should return null if header empty response is null
655
msg = null;
656                 }
657             }
658         }
659         catch (InterruptedException JavaDoc e) {
660             if(log.isErrorEnabled()){
661                 log.error("Got an exception while waiting on receiveNoWait. ", e);
662             }
663
664         }
665         unsubscribeMessageListener(blocker, blocker.getListenerString());
666         queuesManager.getSubscriberManager(consumer.getServiceName()).removeSubscribeToQueue(consumer);
667
668         return msg;
669     }//receiveNoWait
670

671
672     /**
673      * Registers a agent to listener on a remote queue
674      *
675      * @param consumer the consumer of the queue (holds the queue name and more info)
676      * @param numberOfRecive -after this number of message this agent should not be
677      * notified on and more enqueue messages if numberOfRecive == 0
678      * then this is a no wait receive
679      */

680     private BlockingMessageListener registerToQueue(ServiceConsumer consumer, long numberOfReceive) throws MantaException {
681         String JavaDoc queueName = consumer.getServiceName();
682         VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager();
683         MantaService service = queuesManager.getQueueService(queueName);
684         if (service == null || service.getServiceType() != MantaService.SERVICE_TYPE_QUEUE) {
685             throw new MantaException("No such queue Service " + queueName, MantaException.ID_INVALID_ARGUMENTS);
686         }
687
688         BlockingMessageListener blocker = new BlockingMessageListener();
689         queuesManager.getSubscriberManager(consumer.getServiceName())
690             .subscribeToQueue(consumer, blocker, numberOfReceive);
691
692         return blocker;
693     }//registerToQueue
694

695
696     /**
697      * Removes this agent from listening to a given remote queue
698      *
699      * @param listener the local listener
700      * @param consumer the consumer the registered to the queue
701      */

702     private void unregisterFromQueue(ServiceConsumer consumer ,IMessageListener listener ) throws MantaException {
703         String JavaDoc queueName = consumer.getServiceName();
704         VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager();
705         MantaService service = queuesManager.getQueueService(queueName);
706         if (service == null || service.getServiceType() != MantaService.SERVICE_TYPE_QUEUE) {
707             throw new MantaException("No such queue Service " + queueName,
708                     MantaException.ID_INVALID_ARGUMENTS);
709         }
710
711         queuesManager.getSubscriberManager(consumer.getServiceName())
712             .unregisterFromQueue(consumer,listener);
713
714     }
715
716     /**
717      * Returns a copy of the content of a remote queue. <b>Note: 2 thread
718      * should not invoke this method with the same ServiceConsumer object </b>
719      * <b>Note: you MUST advertise yourself as queue consumer before invoking
720      * this method. </b>
721      *
722      * @param consumer holds the queue Name along with information to address the
723      * queue message back to the invoker.
724      * @return a copy of a remote queue messages: LinkedList with
725      * MantaBusMessages
726      * @throws MantaException if service == null or remote error happens
727      * @see org.mr.kernel.services.ServiceConsumer
728      * @see advertiseService(ServiceActor serviceActor)
729      * @see org.mr.core.protocol.MantaBusMessage
730      */

731     public ArrayList JavaDoc CopyQueueContent(ServiceConsumer consumer) throws MantaException {
732         String JavaDoc queueName = consumer.getServiceName();
733         VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager();
734         MantaService service = queuesManager.getQueueService(queueName);
735         if (service == null) {
736             throw new MantaException("No such Service " + queueName, MantaException.ID_INVALID_ARGUMENTS);
737         }
738         ArrayList JavaDoc result = null;
739 // find the destination
740
QueueMaster master =queuesManager.getQueueMaster(queueName);
741         // if master is no where to be found return null
742
if(master == null)
743             return new ArrayList JavaDoc();
744
745         // go on to copy from producers
746
MantaBusMessage msg = MantaBusMessage.getInstance();
747         BlockingMessageListener listener = new BlockingMessageListener(msg);
748         listener.setListenerString(queueName + consumer.getId());
749         subscribeMessageListener(listener, listener.getListenerString());
750         msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CONTROL);
751         // insert the control payload
752
ControlSignal control = new ControlSignal(ControlSignal.OPERATION_TYPE_GET_QUEUE_COPY);
753         msg.setPayload(control);
754
755         msg.setRecipient(master);
756
757         msg.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, queueName);
758
759         send(msg, consumer, MantaAgentConstants.NON_PERSISTENT, MantaAgentConstants.HIGH, MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis());
760
761         try {
762             result = (ArrayList JavaDoc) listener.waitForResponse(Long.MAX_VALUE).getPayload();
763         }
764         catch (Exception JavaDoc e) {
765             if(log.isErrorEnabled()){
766                 log.error("Error in copying remote queue, return with empty ArrayList.", e);
767             }
768
769         }
770         if (result == null) {
771             result = new ArrayList JavaDoc();
772         }
773
774         unsubscribeMessageListener(listener, listener.getListenerString());
775         return result;
776
777     }//CopyQueueContent
778

779
780     /**
781      * Tells the queue coordinator that this service consumer wishes to do multipul Receives (stoped when calling the unsubscribeMessageListenerToQueue)
782      * when a message is ready on the queue side and the coordinator sends the message to the consumer the onMessage method will be invoked of the given listener
783      * in simple terms this transfers the pull of the queue to push
784      * <b>Note: 2 thread
785      * should not invoke this method with the same ServiceConsumer object </b>
786      * <b>Note: you MUST advertise yourself as queue consumer before invoking
787      * this method. </b>
788      *
789      * @param consumer holds the queue Name along with information to address the
790      * queue message back to the invoker.
791      * @param listener from the interface IMessageListener
792      * @throws MantaException in Communication exception
793      */

794     public void subscribeToQueue(ServiceConsumer consumer ,IMessageListener listener) throws MantaException {
795         String JavaDoc queueName = consumer.getServiceName();
796          VirtualQueuesManager queuesManager = singletonRepository.getVirtualQueuesManager();
797          MantaService service = queuesManager.getQueueService(queueName);
798          if (service == null || service.getServiceType() != MantaService.SERVICE_TYPE_QUEUE) {
799             throw new MantaException("No such queue Service " + queueName, MantaException.ID_INVALID_ARGUMENTS);
800         }
801
802         // find the destinations
803
queuesManager.getSubscriberManager(consumer.getServiceName())
804             .subscribeToQueue(consumer , listener, Long.MAX_VALUE);
805
806     }//subscribeMessageListenerToQueue
807

808
809     /**
810      * Tells the queue coordinator that the request to no multiple receives is no longer needed and the receiver should be removed
811      * after this point the listener will not be invoked with the onMessage method
812      * in simple terms this undoes what subscribeMessageListenerToQueue has done
813      *
814      * @param consumer the consumer that wishes to stop getting messages from the queue
815      * @param listener the listener that is going to be remove from the listener to this queue
816      * @throws MantaException if fails to unregister
817      */

818
819     public void unsubscribeFromQueue(ServiceConsumer consumer , IMessageListener listener ) throws MantaException {
820         this.unregisterFromQueue(consumer,listener );
821     }
822
823
824     ///////////////////////////////////////////////////////////
825
// Topics
826
///////////////////////////////////////////////////////////
827
/**
828      * Publishes the message to a topic (deliveryMode , priority , timeToLive,
829      * and other properties are taken from default) <b>Note: you MUST advertise
830      * yourself as topic producer before invoking this method. </b>
831      *
832      * @param message
833      * the message to be published
834      * @param producer
835      * hold the topic name and addressing info of the producer of
836      * this message.
837      * @see org.mr.kernel.services.ServiceProducer
838      * @see advertiseService(ServiceActor serviceActor)
839      * @see org.mr.core.protocol.MantaBusMessage
840
841      */

842     public void publish(MantaBusMessage message, ServiceProducer producer) throws MantaException {
843         publish(message, producer, message.getDeliveryMode(), message.getPriority(), message.getValidUntil());
844     }
845
846     /**
847      * Publishes a message to the topic, specifying delivery mode, priority, and
848      * time to live. <b>Note: you MUST advertise yourself as topic producer
849      * before invoking this method. </b>
850      *
851      * @param message the message to be published
852      * @param producer hold the topic name and addressing info of the producer of
853      * this message.
854      * @param deliveryMode see MantaAgentConstants for types
855      * @param priority see MantaAgentConstants for types
856      * @param expiration after this time this message should not be sent (millisec GMT)
857      * @see org.mr.kernel.services.ServiceProducer
858      * @see advertiseService(ServiceActor serviceActor)
859      * @see org.mr.core.protocol.MantaBusMessage
860      */

861     public void publish(MantaBusMessage message, ServiceProducer producer, byte deliveryMode, byte priority, long expiration) throws MantaException {
862         String JavaDoc topic = producer.getServiceName();
863         VirtualTopicManager topicManager = singletonRepository.getVirtualTopicManager();
864
865         if (!topicManager.hasTopic(topic)) {
866             throw new MantaException("No such active topic Service " + topic, MantaException.ID_INVALID_ARGUMENTS);
867         }
868         try {
869             topicManager.publish(topic ,message, producer, deliveryMode, priority, expiration);
870         } catch (IOException JavaDoc e) {
871             throw new MantaException("failed to publish "+e.getLocalizedMessage(),MantaException.ID_INVALID_ARGUMENTS );
872         }
873
874     }//Publish
875

876     /**
877      * Sets a Message Listener, message listeners are called back by the
878      * manta's layer logic upon receiving a message. if a mantaBusMessage
879      * arrives with a destination that is this given destination the
880      * IMessageListener onMessage method will be called. this method should be used on topics only
881      *
882      * @param listener from the interface IMessageListener
883      * @see IMessageListener
884      */

885     public void subscribeToTopic(IMessageListener listener, String JavaDoc topic) throws MantaException {
886         VirtualTopicManager topicManager = singletonRepository.getVirtualTopicManager();
887         if (!topicManager.hasTopic(topic)) {
888             throw new MantaException("No such active topic Service " + topic, MantaException.ID_INVALID_ARGUMENTS);
889         }
890         subscribeMessageListener(listener, topic);
891     }
892
893
894     /**
895      * Removes a Message Listener from the agent logic. This can be thought of
896      * as a unsubscribing to some Template (topic)
897      *
898      * @param listener from the interface IMessageListener
899      * @see IMessageListener
900      */

901     public void unsubscribeFromTopic(IMessageListener listener, String JavaDoc topic) {
902         unsubscribeMessageListener(listener, topic);
903     }
904
905     ///////////////////////
906
/// general
907
//////////////////////
908
/**
909      * for internal use only.
910      * Sets a Message Listener, message listeners are called back by the
911      * manta's layer logic upon receiving a message. if a mantaBusMessage
912      * arrives with a destination that is this given destination the
913      * IMessageListener onMessage method will be called
914      *
915      * @param listener
916      * from the interface IMessageListener
917      * @see org.mr.IMessageListener
918      */

919     public void subscribeMessageListener(IMessageListener listener, String JavaDoc destination) {
920         singletonRepository.getIncomingClientMessageRouter().addIncommingClientMessageListener(destination, listener);
921     }
922
923
924     /**
925      * for internal use only
926      * Removes a Message Listener from the agent logic. This can be thought of
927      * as a unsubscribing to some Template (topic)
928      *
929      * @param listener from the interface IMessageListener
930      * @see IMessageListener
931      */

932     public void unsubscribeMessageListener(IMessageListener listener, String JavaDoc destination) {
933         singletonRepository.getIncomingClientMessageRouter().removeIncomingClientMessageListener(destination, listener);
934     }
935
936     /**
937      * Sends a message to a remote agent We assume that the message has a valid
938      * headers like destination source and so on for the message to be received
939      * on the other side a IMessageLister should be up and listening to the
940      * logical destination of this message.
941      * <p/>
942      * MantaBusMessage message must hold all the proper info (ack type ,
943      * priority ....) for the message to be sent
944      *
945      * @param message the message to be sent
946      * @param sender the sender of the message holds the return info of the message
947      * @see org.mr.core.protocol.MantaBusMessage
948      * @see org.mr.core.protocol.MantaBusMessageConsts
949      * @see org.mr.kernel.services.ServiceProducer
950      * @see org.mr.kernel.services.ServiceConsumer
951      * @see setMessageListener(IMessageListener listener ,String destination)
952      */

953     public void send(MantaBusMessage message, MantaAddress sender) {
954         if (message.getSource() == null) {
955             message.setSource(sender);
956         }
957
958         MessageManipulator mm =singletonRepository.getMessageManipulator();
959         if(mm!= null){
960             message = mm.manipulate(message, null, MessageManipulator.OUTGOING);
961         }
962         // send the message
963
getSingletonRepository().getPostOffice().SendMessage(message);
964     }
965
966
967     /**
968      * sends a message to an agent , specifying delivery mode, priority and time
969      * to live.
970      *
971      * @param message the message to be sent *
972      * @param sender the sender of the message holds the return info of the message
973      * @param deliveryMode see MantaAgentConstants for valid values
974      * @param priority see MantaAgentConstants for valid values
975      * @param expiration in millisec GMT
976      * @see org.mr.core.protocol.MantaBusMessage
977      * @see org.mr.core.protocol.MantaBusMessageConsts
978      * @see org.mr.kernel.services.ServiceProducer
979      * @see org.mr.kernel.services.ServiceConsumer
980      * @see setMessageListener(IMessageListener listener ,String destination)
981      */

982     public void send(MantaBusMessage message, MantaAddress sender, byte deliveryMode, byte priority, long expiration) {
983
984         message.setPriority(priority);
985
986         message.setDeliveryMode(deliveryMode);
987
988         if(message.getValidUntil() < 1){
989             // if ValidUntil was not set set it
990
message.setValidUntil(expiration);
991
992         }
993
994         send(message, sender);
995
996     }
997
998     //////////////////////////////////////
999
// acks
1000
/////////////////////////////////////
1001

1002    /**
1003     * Tells the sender of a message that this receiver has got the message and
1004     * not resending to this receive address is needed.
1005     * <p/>
1006     * the point in the code where this method is called after the receiving of
1007     * a message depends if the ack type of the message
1008     *
1009     * @param messageToAckToo the original message that we want to ack to
1010     * @see org.mr.core.protocol.MantaBusMessage
1011     * @see org.mr.core.protocol.MantaBusMessageConsts
1012     */

1013    public void ack(MantaBusMessage messageToAckToo) {
1014        if(messageToAckToo !=null && messageToAckToo.getRecipient() != null && messageToAckToo.getSource() != null){
1015            MantaBusMessage reply = MantaBusMessageUtil.createACKMessage(messageToAckToo);
1016
1017            // set the ack to header
1018
String JavaDoc messageId = messageToAckToo.getMessageId();
1019            reply.addHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE, messageId);
1020            reply.removeHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION);
1021            //Aviad use startupLogger
1022
// if (log.isDebugEnabled()) {
1023
// log.debug("Sending ACK: Message ID="+
1024
// reply.getMessageId()+
1025
// ", responding to message "+
1026
// messageId);
1027
// }
1028
StartupLogger.log.debug("Sending ACK: Message ID="+
1029                            reply.getMessageId()+
1030                            ", responding to message "+
1031                            messageId,"MantaAgent");
1032            send(reply,
1033                 messageToAckToo.getRecipient(),
1034                 MantaAgentConstants.NON_PERSISTENT,
1035                 MantaAgentConstants.HIGH,
1036                 MantaAgentConstants.ACK_TTL+SystemTime.gmtCurrentTimeMillis());
1037        }
1038
1039    }
1040
1041    /**
1042     * Tells the sender of a message that no receiver has got the message and
1043     * the message should be returned to the queue.
1044     *
1045     * @param messageToAckTo the original message that we want to ack to
1046     * @see org.mr.core.protocol.MantaBusMessage
1047     * @see org.mr.core.protocol.MantaBusMessageConsts
1048     */

1049    public void ackReject(MantaBusMessage messageToAckTo) {
1050        if(messageToAckTo !=null && messageToAckTo.getRecipient() != null &&
1051           messageToAckTo.getSource() != null) {
1052            MantaBusMessage reply =
1053                MantaBusMessageUtil.createACKMessage(messageToAckTo);
1054
1055            // set the ack to header
1056
String JavaDoc messageId = messageToAckTo.getMessageId();
1057            reply.addHeader(MantaBusMessageConsts.HEADER_NAME_ACK_REJECT_RESPONSE_REFERENCE, messageId);
1058            reply.removeHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION);
1059            if (log.isDebugEnabled()) {
1060                log.debug("Sending Ack Reject: Message ID=" +
1061                          reply.getMessageId() + ", responding to message " +
1062                          messageId);
1063            }
1064            send(reply,
1065                 messageToAckTo.getRecipient(),
1066                 MantaAgentConstants.NON_PERSISTENT,
1067                 MantaAgentConstants.HIGH,
1068                 MantaAgentConstants.ACK_TTL +
1069                 SystemTime.gmtCurrentTimeMillis());
1070        }
1071    }
1072
1073
1074    /**
1075     * do not use this method it is for inner agent use
1076     */

1077    public void gotAck(String JavaDoc ackedMessageId, MantaAddress source) {
1078        MantaBusMessage msg = singletonRepository.getPostOffice().gotAck(ackedMessageId ,source );
1079        if (msg == null) return;
1080
1081        singletonRepository.getDeliveryAckNotifier().gotAck(msg , source);
1082
1083    }//gotAck
1084

1085    /**
1086     * do not use this method it is for inner agent use
1087     */

1088    public void gotAckReject(String JavaDoc ackedMessageId, MantaAddress source) {
1089        // it don't matter to the post office! ACK, Reject, whatever:
1090
// it only wants to delete the saveed message.
1091
MantaBusMessage msg =
1092            singletonRepository.getPostOffice().gotAckReject(ackedMessageId, source);
1093        if (msg != null) {
1094            singletonRepository.getDeliveryAckNotifier().gotAckReject(msg,
1095                                                                      source);
1096        }
1097    }//gotAck
1098

1099    /**
1100     * @param ackListener The ackListener to set.
1101     */

1102    public void setAckListener(DeliveryAckListener ackListener) {
1103        if(singletonRepository.getDeliveryAckNotifier() == null){
1104            singletonRepository.setDeliveryAckNotifier(new DeliveryAckNotifier());
1105        }
1106        singletonRepository.getDeliveryAckNotifier().setGlobalListener(ackListener);
1107    }
1108
1109    ///////////////////////////////////////////////////////////
1110
// General methods relating to service operations
1111
///////////////////////////////////////////////////////////
1112

1113    /**
1114     * Tells all the other layer that this layer is a producer or a consumer
1115     * (role) of a queue or a topic (service) this method MUST be called before
1116     * you perform operations on a service. example: before publishing stock
1117     * messages to a stock topic you call this method with a producer role,
1118     * then you can publish as many message as you want on this topic. if you
1119     * publish a message to a topic without using this method no one will
1120     * consume this message and it will be lost.
1121     * <p/>
1122     * For a given service and a given role a one-time advertise should be done
1123     * unless you recall the role using the recallService(ServiceActor
1124     * serviceActor). You could at any time recall your role to a service.
1125     *
1126     * @param serviceActor hold the service name and the role of the invoker of this
1127     * method
1128     * @throws MantaException if service not found
1129     * @see org.mr.kernel.services.ServiceProducer
1130     * @see org.mr.kernel.services.ServiceConsumer
1131     */

1132    public void advertiseService(ServiceActor serviceActor) throws MantaException {
1133
1134        WorldModeler world = singletonRepository.getWorldModeler();
1135
1136        // activate the service
1137
if (serviceActor.getServiceType() == MantaService.SERVICE_TYPE_QUEUE) {
1138            // this is a queue
1139
VirtualQueuesManager vqm = singletonRepository.getVirtualQueuesManager();
1140            MantaService queue = vqm.getQueueService(serviceActor.getServiceName()) ;
1141            if (queue == null) {
1142                throw new MantaException("No such Queue " + serviceActor.getServiceName(), MantaException.ID_INVALID_ARGUMENTS);
1143            }
1144
1145
1146            if(serviceActor.getType() == ServiceActor.COORDINATOR){
1147                world.addCoordinatedService(queue);
1148                QueueMaster coordinator = (QueueMaster) serviceActor;
1149                coordinator.setValidUntil(Long.MAX_VALUE);
1150                vqm.setQueueMaster(serviceActor.getServiceName(), (QueueMaster) serviceActor);
1151                // This is a patch:
1152
// In case of queue coordinator we want to advertise before recovering
1153
// the messages. That prevents other peers to advertise them selves
1154
// as coordinators while this agent recovers.
1155
if (!serviceActor.getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) {
1156                    singletonRepository.getServiceActorControlCenter().advertiseService(serviceActor, this);
1157                }
1158                // activate the queue
1159
vqm.active(serviceActor.getServiceName());
1160                return;
1161            } else if (serviceActor.getType() == ServiceActor.PRODUCER) {
1162                world.addProducedService(queue);
1163                queue.addProducer((ServiceProducer) serviceActor);
1164            } else if (serviceActor.getType() == ServiceActor.CONSUMER) {
1165                world.addConsumedServices(queue);
1166                queue.addConsumer((ServiceConsumer) serviceActor);
1167            }
1168            if(vqm.isTempQueue(queue.getServiceName())){
1169                // do not advertise temp roles
1170
return;
1171            }
1172
1173        }else{
1174            // this is a topic
1175
VirtualTopicManager vtm = getSingletonRepository().getVirtualTopicManager();
1176
1177            if (serviceActor.getType() == ServiceActor.PRODUCER) {
1178                MantaService topic =(MantaService)getService( serviceActor.getServiceName(), MantaService.SERVICE_TYPE_TOPIC);
1179                if (topic == null) {
1180                    throw new MantaException("No such Topic " + serviceActor.getServiceName(), MantaException.ID_INVALID_ARGUMENTS);
1181                }
1182
1183                singletonRepository.getWorldModeler().addProducedService(topic);
1184                topic.addProducer((ServiceProducer) serviceActor);
1185            } else if (serviceActor.getType() == ServiceActor.CONSUMER) {
1186                singletonRepository.getVirtualTopicManager().addConsumer((ServiceConsumer)serviceActor);
1187            }
1188        }
1189        if (serviceActor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC || !serviceActor.getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) {
1190            // temp queue roles should not be advertise
1191
singletonRepository.getServiceActorControlCenter().advertiseService(serviceActor, this);
1192        }
1193    }//advertiseService
1194

1195    /**
1196     * Recalls a previously advertised role (consumer or producer) in service
1197     * (queue or a topic) which is not longer accessible from this layer. if at
1198     * runtime you want to stop been a consumer or a producer (i.e when going
1199     * offline) you SHOULD use this method to recalls the role in the service
1200     *
1201     * @throws MantaException if service not found
1202     * @see org.mr.kernel.services.ServiceProducer
1203     * @see org.mr.kernel.services.ServiceConsumer
1204     */

1205    public void recallService(ServiceActor serviceActor) throws MantaException {
1206
1207        WorldModeler world = singletonRepository.getWorldModeler();
1208        String JavaDoc serviceName = serviceActor.getServiceName();
1209        if (serviceActor.getType() == ServiceActor.PRODUCER) {
1210
1211            MantaService service = getService( serviceName,serviceActor.getServiceType());
1212            if (service == null) {
1213                throw new MantaException("No such Service " + serviceName, MantaException.ID_INVALID_ARGUMENTS);
1214            }
1215
1216            service.removeProducer((ServiceProducer) serviceActor);
1217
1218            if (service.getProducersByAgentId(getAgentName()).isEmpty()) {
1219                world.removeProducedService(service);
1220            }
1221        } else if (serviceActor.getType() == ServiceActor.CONSUMER) {
1222             if(serviceActor.getServiceType() == MantaService.SERVICE_TYPE_TOPIC){
1223                singletonRepository.getVirtualTopicManager().removeConsumer((ServiceConsumer) serviceActor);
1224             }else{
1225                MantaService service = getService( serviceName,serviceActor.getServiceType());
1226                if (service == null) {
1227                    throw new MantaException("No such Service " + serviceName, MantaException.ID_INVALID_ARGUMENTS);
1228                }
1229
1230                service.removeConsumer((ServiceConsumer) serviceActor);
1231
1232                // this is a queue lets remove its subscription to the queue
1233
ServiceConsumer consumer = (ServiceConsumer) serviceActor;
1234                singletonRepository.getVirtualQueuesManager().getSubscriberManager(serviceName).removeSubscribeToQueue(consumer);
1235
1236                if (service.getConsumersByAgentId(getAgentName()).isEmpty()) {
1237                    world.removeConsumedServices(service);
1238                }
1239             }
1240
1241        }else if (serviceActor.getType() == ServiceActor.COORDINATOR){
1242            MantaService service = getService( serviceName,serviceActor.getServiceType());
1243            if (service == null) {
1244                throw new MantaException("No such Service " + serviceName, MantaException.ID_INVALID_ARGUMENTS);
1245            }
1246
1247            if(singletonRepository.getVirtualQueuesManager().amIQueueMaster(serviceName)==false){
1248                // i am not a coordinator of this queue
1249
return;
1250            }
1251            getSingletonRepository().getPostOffice().handleCoordinatorDown((QueueMaster)serviceActor, null);
1252            world.removeCoordinatedService(service);
1253        }
1254        if (!serviceActor.getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX)) {
1255            // no need to recall temp roles
1256
singletonRepository.getServiceActorControlCenter().recallService(serviceActor, this);
1257        }
1258    }//recallService
1259

1260    /**
1261     * Recalls a previously advertised durable subscriber role (consumer ) in topic service
1262     * which is not longer accessible from this layer. if at runtime you want to stop been
1263     * a durable subscriber (consumer) (i.e when going offline and do not want to get
1264     * offline messages) you SHOULD use this method to recalls the role in the service
1265     *
1266     * @throws MantaException if service not found
1267     * @see org.mr.kernel.services.ServiceConsumer
1268     */

1269    public void recallDurableSubscription(ServiceActor durableSubscription) throws MantaException {
1270
1271        WorldModeler world = singletonRepository.getWorldModeler();
1272        MantaService service =(MantaService) getService( durableSubscription.getServiceName(),durableSubscription.getServiceType());
1273        if (service == null) {
1274            throw new MantaException("No such Service " + durableSubscription.getServiceName(), MantaException.ID_INVALID_ARGUMENTS);
1275        }
1276
1277         singletonRepository.getVirtualTopicManager().removeDurableConsumer(service.getServiceName(),(ServiceConsumer) durableSubscription);
1278
1279        if (service.getConsumersByAgentId(getAgentName()).isEmpty()) {
1280            world.removeConsumedServices(service);
1281        }
1282
1283        singletonRepository.getServiceActorControlCenter().recallDurableSubscription(durableSubscription, this);
1284
1285
1286    }//recallService
1287

1288    /**
1289     * @return a new manta bus message with a new id
1290     * @see org.mr.core.protocol.MantaBusMessage
1291     * @see org.mr.core.protocol.MantaBusMessageConsts
1292     */

1293    public MantaBusMessage getMantaBusMessage() {
1294        return MantaBusMessage.getInstance();
1295    }
1296
1297
1298    /**
1299     * returns the service object for this name trys to create it if not found or null if not found and can't be created. service
1300     * object can be :QueueService or TopicService
1301     *
1302     * @param name the name of the Queue or Topic
1303     * @param serviceType needed for dynamic service creation can be MantaService.SERVICE_TYPE_QUEUE or SERVICE_TYPE_TOPIC
1304     * @return the service object for this name
1305     * @see org.mr.kernel.services.queues.QueueService
1306     * @see org.mr.kernel.services.topic.TopicService
1307     */

1308    public MantaService getService(String JavaDoc name, byte serviceType) {
1309        MantaService result = null;
1310        if(serviceType == MantaService.SERVICE_TYPE_QUEUE){
1311            result = singletonRepository.getVirtualQueuesManager().getQueueService(name);
1312        }else{
1313            result = singletonRepository.getVirtualTopicManager().getTopicService(name);
1314        }
1315        return result;
1316    }
1317
1318    /**
1319     * returns true if service is in the world modeler else false
1320     *
1321     * @param service the name of the Queue or Topic
1322     * @return true if service is in the world modeler else false
1323     * @see org.mr.kernel.services.queues.QueueService
1324     * @see org.mr.kernel.services.topic.TopicService
1325     */

1326    public boolean containsService(String JavaDoc service){
1327         return singletonRepository.getWorldModeler().containsService(singletonRepository.getWorldModeler().getDefaultDomainName(), service );
1328    }
1329
1330
1331    /**
1332     * DO NOT USE for internal use only
1333     *
1334     * @return SingletonRepository hold all the singletons in the Singletons in
1335     * the system
1336     */

1337    public SingletonRepository getSingletonRepository() {
1338        return singletonRepository;
1339    }//getSingletoneRepository
1340

1341    /**
1342     * @return a one time unique message id
1343     */

1344    public String JavaDoc getMessageId() {
1345        return String.valueOf(UniqueIDGenerator.getNextMessageID());
1346    }
1347
1348
1349    /**
1350     * gets the agent name of the current Transport layer
1351     *
1352     * @return the agent name of the current Transport layer
1353     */

1354    public String JavaDoc getAgentName() {
1355
1356        return myName;
1357    }
1358
1359
1360    /**
1361     * gets the domain name of the current Transport layer
1362     *
1363     * @return the domain name of the current Transport layer
1364     */

1365    public String JavaDoc getDomainName() {
1366        return singletonRepository.getWorldModeler().getDefaultDomainName();
1367    }
1368
1369
1370    protected DynamicRepository getDynamicRepository() {
1371        return dynamicRepository;
1372    }
1373
1374    /**
1375     * This method is used by MantaConnectionFactory to set the configuration DOm element, in case a configuration file
1376     * is not used.
1377     *
1378     * @param element Configuration XML given as a DOM element
1379     */

1380    public static void setConfiguration(Element JavaDoc element){
1381        configurationElement = element;
1382    }
1383
1384    public static boolean isStarted() {
1385        return started;
1386    }
1387}//MantaAgent
1388
Popular Tags