KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > services > queues > VirtualQueuesManager


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat .
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.kernel.services.queues;
47
48 import java.util.Collections JavaDoc;
49 import java.util.HashMap JavaDoc;
50 import java.util.HashSet JavaDoc;
51 import java.util.Iterator JavaDoc;
52 import java.util.Map JavaDoc;
53 import java.util.Set JavaDoc;
54
55 import org.apache.commons.logging.Log;
56 import org.apache.commons.logging.LogFactory;
57 import org.mr.MantaAgent;
58 import org.mr.MantaAgentConstants;
59 import org.mr.MantaException;
60 import org.mr.core.protocol.DeadEndRecipient;
61 import org.mr.core.protocol.MantaBusMessage;
62 import org.mr.core.protocol.MantaBusMessageConsts;
63 import org.mr.core.util.SystemTime;
64 import org.mr.core.util.byteable.ByteableList;
65 import org.mr.kernel.services.MantaService;
66 import org.mr.kernel.services.ServiceConsumer;
67 import org.mr.kernel.services.ServiceProducer;
68 import org.mr.kernel.world.WorldModeler;
69
70
71 /**
72  * This class represents a collection of Queues, these Queues are the concrete implementation
73  * of the queue concept as taken from the p2p domain.
74  *
75  *
76  * @version 1.0
77  * @since Jan 13, 2004
78  * @author Amir Shevat
79  *
80  */

81 public class VirtualQueuesManager {
82
83     private Map JavaDoc virtualQueueServicesMap ;
84     public Log log = null;
85
86     public static final String JavaDoc ENQUEUE_ACK_HEADER ="enqueue_ack";
87     public static final byte ENQUEUE_OK = 1;
88     public static final byte NOT_MASTER = 2;
89     public static final byte ENQUEUE_FAIL = 3;
90     private static long enqueueWaitforCoordinator = -1;
91
92     // when a message is enqueued we change the source of the message to the master and keep this originator under this key
93
public static final String JavaDoc ORIGINAL_MESSAGE_PRUDUCER = "msg_org";
94
95     public static final String JavaDoc ENQUEUE_TIME = "enq_time";
96
97     public VirtualQueuesManager(){
98         virtualQueueServicesMap = Collections.synchronizedMap(new HashMap JavaDoc());
99         enqueueWaitforCoordinator = MantaAgent.getInstance().getSingletonRepository()
100             .getConfigManager().getLongProperty("jms.enqueueWaitForCoordinator",-1);
101         log=LogFactory.getLog("VirtualQueuesManager");
102
103     }//VirtualQueuesManager
104

105
106
107     /**
108      * returns a Queue of a given name
109      * @param queueName the name of the queue to be returnd
110      * @return the queue of a given name
111      */

112     public synchronized AbstractQueueService getQueueService(String JavaDoc queueName){
113         AbstractQueueService result =(AbstractQueueService) virtualQueueServicesMap.get(queueName);
114         if(result == null){
115             MantaAgent manta = MantaAgent.getInstance();
116             result = (AbstractQueueService) manta.getSingletonRepository().getWorldModeler().getService(manta.getDomainName(),queueName,MantaService.SERVICE_TYPE_QUEUE );
117             if(result !=null){
118                 virtualQueueServicesMap.put(queueName, result);
119                 if(log.isDebugEnabled()){
120                     log.debug("Added queue "+queueName);
121                 }
122             }
123
124         }
125         return result;
126     }
127
128     public Set JavaDoc getQueueServices(){
129         return new HashSet JavaDoc(virtualQueueServicesMap.values());
130     }
131
132     /**
133      * register a remote agent to be a receiver of the queue
134      * @param queueName the queue that this agent waht to register too
135      * @param consumer the registering agent consumer object to this queue
136      * @param numberOfReceive the number after witch this receiver will be removed
137      */

138     public boolean registerReceiverToQueue( ServiceConsumer consumer ,long numberOfReceive ){
139         AbstractQueueService service = getQueueService(consumer.getServiceName());
140         if(service == null){
141             if(log.isErrorEnabled()){
142                 log.error("Got receive request on queue that this layer is not a coordinator of - queue = "+consumer.getServiceName());
143             }
144             return false;
145         }
146         service.active();
147         service.registerReceiverToQueue(consumer , numberOfReceive );
148         return true;
149     }// registerReceiverToQueue
150

151     /**
152      * removes a receiver from a queue
153      * @param consumer the registering agent consumer object to this queue
154      * @param receiverId
155      * @param agent the reciver name
156      */

157     public void unregisterReceiverToQueue( ServiceConsumer consumer ){
158         AbstractQueueService service = getQueueService(consumer.getServiceName());
159         if(service == null){
160             if(log.isErrorEnabled()){
161                 log.error("Got un-register request on queue that this layer is not a coordinator of - queue = "+consumer.getServiceName());
162             }
163             return ;
164         }
165         service.active();
166         service.unregisterReceiverToQueue(consumer );
167     }// unregisterReceiverToQueue
168

169
170     public void closeQueue(String JavaDoc queueName) throws MantaException{
171         AbstractQueueService queue =(AbstractQueueService) virtualQueueServicesMap.get(queueName);
172         if(queue != null){
173             queue.close();
174             virtualQueueServicesMap.remove(queueName);
175             WorldModeler wm=MantaAgent.getInstance().getSingletonRepository().getWorldModeler();
176             wm.removeService(wm.getDefaultDomainName(),queueName);
177             if(log.isDebugEnabled()){
178                 log.debug("Deleted queue "+queueName);
179             }
180         }
181     }
182
183     /**
184      * sends a copy of the queue to a remote agent
185      * @param queueName the queue to be copied
186      * @param agentName the receiving agent
187      */

188     public void sendQueueCopy(ServiceConsumer consumer) {
189         AbstractQueueService service = getQueueService(consumer.getServiceName());
190         ByteableList underline;
191         if(service == null){
192             if(log.isErrorEnabled()){
193                 log.error("Got sendQueueCopy request on queue that this layer is not a coordinator of - queue = "+consumer.getServiceName());
194             }
195
196             underline = new ByteableList();
197             QueueReceiver receiver = new QueueReceiver(consumer , 0);
198
199             MantaBusMessage msg = MantaBusMessage.getInstance();
200             msg.setPayload(underline);
201             msg.setPriority(MantaAgentConstants.HIGH);
202             msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT);
203             msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
204             receiver.receive(msg);
205         }else{
206             service.active();
207             service.sendQueueCopy(consumer);
208         }
209
210
211
212     }//sendQueueCopy
213

214
215     /**
216      * This method returns the number of queues currently defined in the agent.
217      * @return
218      */

219     public int getNumberOfQueueServices(){
220         return virtualQueueServicesMap.size();
221     }
222
223     /**
224      * returns true if a given queue name exist in the manager
225      * @param queueName the name of the queue
226      * @return true if a given queue name exist in this manager, else false.
227      */

228     public boolean hasQueueService(String JavaDoc queueName){
229         return virtualQueueServicesMap.containsKey(queueName);
230     }
231
232     /**
233      * This method is used internally by the add() method.
234      * @param Queue
235      */

236     public void addNewQueueServiceToMap( AbstractQueueService service){
237         //service.active();
238
virtualQueueServicesMap.put(service.getServiceName(),service);
239     }//addNewQueueToMap
240

241     /**
242      * Waits for a given time for a queue coordinator to show up
243      * @param queueName the name of the queue
244      * @param timeToWait the time to wait
245      * @throws InterruptedException
246      */

247     public void waitForQueueMaster(String JavaDoc queueName, long timeToWait) throws InterruptedException JavaDoc{
248         getQueueService(queueName).waitForQueueMaster(timeToWait);
249     }
250
251     /**
252      * sets a new queue coordinator to a given queue
253      * @param queueName the name of the queue
254      * @param queueMaster the coordinator of the queue
255      */

256     public void setQueueMaster(String JavaDoc queueName,QueueMaster queueMaster){
257         getQueueService(queueName).setQueueMaster(queueMaster);
258     }
259
260     /**
261      * return the coordinator of this queue or null if non
262      * @param queueName the name of the queue
263      */

264     public QueueMaster getQueueMaster(String JavaDoc queueName){
265         return getQueueService(queueName).getQueueMaster();
266     }
267
268
269
270
271     /**
272      * enqueues the message and sends response to the producer
273      * @param producer the producer of the messsage
274      * @param enqueuedMessage the message to be enqueued
275      * @param a Piggyback ack to the control message
276      */

277     public void handleEnqueueMessageToQueue(String JavaDoc controlId ,ServiceProducer producer, QueueMaster master, MantaBusMessage enqueuedMessage, String JavaDoc responseToId) {
278         boolean ok = false;
279         boolean deley = false;
280         try{
281             String JavaDoc queueName = producer.getServiceName();
282             AbstractQueueService queue = getQueueService(queueName);
283             if(queue == null ||!queue.amIQueueMaster() ){
284                 // precondition fails send resp and return
285
sendEnqueueResp(controlId,producer,master ,enqueuedMessage,NOT_MASTER ,responseToId,deley );
286                 return;
287             }
288 // when a message is enqueued we change the source of the message to the master and keep this originator here
289
enqueuedMessage.addHeader(ORIGINAL_MESSAGE_PRUDUCER,((ServiceProducer)enqueuedMessage.getSource()).getId() );
290             enqueuedMessage.setSource(queue.getQueueMaster());
291             byte deliveryMode =enqueuedMessage.getDeliveryMode();
292             //Aviad add enqueue time - so queue order will be kept by enqueue time
293
enqueuedMessage.addHeader(ENQUEUE_TIME,SystemTime.currentTimeMillis()+"");
294             boolean persistent = deliveryMode == MantaAgentConstants.PERSISTENT;
295             // if queue is persistent then the message must be
296
if (queue.getPersistentMode() == MantaAgentConstants.PERSISTENT)
297                 persistent = true;
298             // make sure the queue is active, in most cases it will return in no time
299
queue.active();
300
301
302
303             ok = !queue.isOverflow();
304             if(ok){
305                 queue.enqueue(enqueuedMessage, persistent);
306                 if (log.isDebugEnabled()) {
307                     log.debug("Message is added to queue. Message ID="+enqueuedMessage.getMessageId()+", Queue="+queue.getServiceName());
308                 }
309             }else{
310                 String JavaDoc overFlawMsg = "Queue overflow. Queue name="+queue.getServiceName()+", Queue size="+queue.getUnsentCount();
311                 if(queue.getOverflowStrategy() ==
312                    AbstractQueueService.THROW_EXCEPTION_STRATERGY
313                    || queue.getOverflowStrategy() ==
314                           AbstractQueueService.
315                           RETURN_WITHOUT_ENQUEUE_STRATERGY) {
316                     if (log.isWarnEnabled()) {
317                         log.warn(overFlawMsg+". Message droped.");
318                     }
319                 }
320                 else if(queue.getOverflowStrategy() ==
321                         AbstractQueueService.THROTTLE_STRATERGY) {
322                     queue.enqueue(enqueuedMessage, persistent);
323                     if (log.isDebugEnabled()) {
324                         log.debug("Message is added to queue. Message ID="+enqueuedMessage.getMessageId()+", Queue="+queue.getServiceName());
325                         log.debug(overFlawMsg+". Initiating throttling strategy.");
326                     }
327
328                     deley= true;
329                     ok = true;
330                 }
331
332
333             }
334
335
336         }catch(Throwable JavaDoc t){
337             if(log.isErrorEnabled())
338                 log.error("Could not enqueue message. Message ID="+enqueuedMessage.getMessageId()+". ",t);
339         }
340         if(ok){
341             sendEnqueueResp( controlId , producer,master, enqueuedMessage ,ENQUEUE_OK,responseToId ,deley);
342         }else{
343             sendEnqueueResp( controlId , producer,master, enqueuedMessage ,ENQUEUE_FAIL,responseToId ,deley);
344         }
345
346
347     }
348
349
350     /**
351      * internal use for control messages
352      * do not use
353      */

354     private void sendEnqueueResp(String JavaDoc controlId ,ServiceProducer producer,QueueMaster master, MantaBusMessage enqueuedMessage ,byte respCode, String JavaDoc responseToId, boolean delay) {
355         if(log.isDebugEnabled()){
356             log.debug("Got message enqueue and responded with code "+respCode+" message was "+enqueuedMessage+".");
357         }
358         MantaBusMessage controlMsg = MantaBusMessage.getInstance();
359
360         controlMsg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
361
362
363         controlMsg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT);
364         controlMsg.setPriority(MantaAgentConstants.HIGH);
365         controlMsg.setValidUntil( MantaAgentConstants.CONTROL_MESSAGES_TTL+SystemTime.gmtCurrentTimeMillis());
366         controlMsg.setSource(master);
367         controlMsg.setRecipient(DeadEndRecipient.createDeadEndRecipient(producer.getAgentName(),producer.getDomainName()) );
368         controlMsg.addHeader(MantaBusMessageConsts.HEADER_NAME_ACK_RESPONSE_REFERENCE , responseToId);
369         controlMsg.addHeader(MantaBusMessageConsts.HEADER_NAME_LOGICAL_DESTINATION, producer.getServiceName()+controlId);
370         //Aviad added this - send enqueue result to producer
371
controlMsg.addHeader(MantaBusMessageConsts.ENQUEUE_STATUS,""+respCode);
372         if(!delay){
373             MantaAgent.getInstance().send(controlMsg,master );
374         }else{
375             MantaAgent.getInstance().getSingletonRepository()
376                 .getDelayedMessageSender().send(controlMsg, QueueService.throttleDelay);
377         }
378
379     }
380
381     public QueueSubscriberManager getSubscriberManager(String JavaDoc queueName) {
382         return getQueueService(queueName).getSubscriberManager();
383     }
384     /**
385      * activates the queue reactor thread
386      * @param serviceName the name of the queue
387      */

388     public void active(String JavaDoc serviceName) {
389         getQueueService(serviceName).active();
390
391     }
392
393     public boolean amIQueueMaster(String JavaDoc serviceName) {
394         return getQueueService(serviceName).amIQueueMaster();
395     }
396
397
398     public boolean isTempQueue(String JavaDoc serviceName){
399         return getQueueService(serviceName).isTempQueue();
400     }
401
402
403
404
405     public static long getEnqueueWaitforCoordinator() {
406         return enqueueWaitforCoordinator;
407     }
408 }//VirtualQueuesManager
409

410
Popular Tags