KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > services > queues > QueueService


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.kernel.services.queues;
47
48 import java.io.IOException JavaDoc;
49 import java.util.ArrayList JavaDoc;
50 import java.util.Enumeration JavaDoc;
51 import java.util.HashMap JavaDoc;
52 import java.util.Iterator JavaDoc;
53 import java.util.LinkedList JavaDoc;
54 import java.util.List JavaDoc;
55 import java.util.Map JavaDoc;
56
57 import javax.jms.JMSException JavaDoc;
58
59 import org.apache.commons.logging.Log;
60 import org.apache.commons.logging.LogFactory;
61 import org.mr.MantaAgent;
62 import org.mr.MantaAgentConstants;
63 import org.mr.MantaException;
64 import org.mr.api.jms.MantaConnection;
65 import org.mr.api.jms.MantaMessage;
66 import org.mr.api.jms.MantaTextMessage;
67 import org.mr.core.net.MantaAddress;
68 import org.mr.core.persistent.PersistentMap;
69 import org.mr.core.protocol.MantaBusMessage;
70 import org.mr.core.protocol.MantaBusMessageConsts;
71 import org.mr.core.protocol.MantaBusMessageUtil;
72 import org.mr.core.util.PrioritizedList;
73 import org.mr.core.util.SystemTime;
74 import org.mr.core.util.byteable.ByteableList;
75 import org.mr.kernel.delivery.DeliveryAckListener;
76 import org.mr.kernel.delivery.DeliveryAckNotifier;
77 import org.mr.kernel.delivery.PostOffice;
78 import org.mr.kernel.services.DeadLetterHandler;
79 import org.mr.kernel.services.MantaService;
80 import org.mr.kernel.services.ServiceActorControlCenter;
81 import org.mr.kernel.services.ServiceActorStatusListener;
82 import org.mr.kernel.services.ServiceConsumer;
83 import org.mr.kernel.services.ServiceProducer;
84
85 /**
86  * QueueService is a manta service is a FIFO service with 1 producer and multiple consumers (see jms spec for for info)
87  * @since Feb 1, 2004
88  * @version 2.0 (1.0 up untill MantaRay 1.6)
89  * @author Amir Shevat
90  *
91  */

92  class QueueService extends AbstractQueueService implements DeliveryAckListener, ServiceActorStatusListener, QueueServiceMBean{
93     // two strategies in case of overflow
94
// public static final int THROW_EXCEPTION_STRATERGY =0;
95
// public static final int RETURN_WITHOUT_ENQUEUE_STRATERGY =1;
96
// public static final int THROTTLE_STRATERGY =2;
97
// time to delay throttled producer
98
public static final int throttleDelay = 5;
99
100     protected Log log;
101     private PrioritizedList unsentMessages;
102     private LinkedList JavaDoc sentMessages;
103     private Map JavaDoc savedMessages;
104     private QueueSubscriberManager subscriberManager;
105     private boolean active;
106     private QueueDispatcher dispatch;
107 // a list of receivers waiting on queue data
108
private LinkedList JavaDoc queueListeners ;
109     /*
110      * The reciver that is currently has unacked messages from this queue.
111      * The queue will send messages only to this QueueReceiver untill all
112      * messages sent to this reciver are acked and then change the currentQueueReceiver
113      */

114     private ServiceConsumer currentServiceConsumer = null;
115     private Object JavaDoc currentServiceConsumerLock = null;
116
117     private QueueMaster queueMaster;
118
119     private boolean iAmQueueMaster = false;
120
121     private Object JavaDoc queueMasterLockObject = new Object JavaDoc();
122
123     private DeliveryAckNotifier ackNotifier;
124
125     private static long maxQueueSize = Long.MAX_VALUE ;
126     //Aviad - should be only in AbstractQueueService
127
//int overflowStrategy;
128
// if true messages should not be dequeue form queue
129
private boolean pause;
130     private Object JavaDoc pauseLockObject = new Object JavaDoc();
131     private boolean isTempQueue = false;
132
133
134     /**
135      * a new queue with a given name
136      * @param serviceName the name of the queue
137      */

138     public QueueService(String JavaDoc serviceName ) {
139         super(serviceName);
140         log=LogFactory.getLog("QueueService");
141         isTempQueue = getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX);
142         subscriberManager = new QueueSubscriberManager(this);
143         ackNotifier = MantaAgent.getInstance().getSingletonRepository().getDeliveryAckNotifier();
144         maxQueueSize = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getLongProperty("jms.max_queue_size", 1000000);
145         overflowStrategy = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getIntProperty("jms.queue_overflow_strategy",2);
146         if(!isTempQueue){
147             try {
148                 MantaAgent.getInstance().getSingletonRepository().getMantaJMXManagment().addManagedObject(this, "MantaRay:queue="+this.getServiceName());
149             } catch (MantaException e) {
150                 if(log.isErrorEnabled()){
151                     log.error("Could not create the JMX MBean 'MantaRay:queue="+this.getServiceName()+"'.",e);
152                 }
153             }
154         }
155
156
157     }
158
159     /**
160      * @return true if the queue is paused at the moment, false otherwise
161      */

162     public boolean isPaused(){
163         return this.pause;
164     }
165
166     /**
167      * Paused the queue. Pausing means producers will still be able to produce
168      * messages, but those messages will be held inside the queue and not sent
169      * to the consumers until the queue is resumed again.
170      */

171     public void pause(){
172         synchronized(pauseLockObject){
173             this.pause = true;
174         }
175     }
176
177     /**
178      * resumes a queue that was paused.
179      */

180     public void resume(){
181         synchronized(pauseLockObject){
182             this.pause = false;
183             pauseLockObject.notifyAll();
184         }
185     }
186
187     /**
188      * Purges all the messages currently held by the queue. This method
189      * can be operated only after the queue has been paused.
190      */

191     public void purge(){
192         if(unsentMessages != null)
193             this.unsentMessages.clear();
194         if(sentMessages != null)
195             this.sentMessages.clear();
196         if(savedMessages != null)
197             this.savedMessages.clear();
198     }
199
200     /**
201      * closes the queue and stops the reactor thead if it works.
202      * @throws MantaException
203      *
204      */

205     public void close() throws MantaException{
206         if(dispatch !=null){
207             dispatch.stopIt();
208         }
209         if(iAmQueueMaster){
210             MantaAgent.getInstance().recallService(this.getQueueMaster());
211             ServiceActorControlCenter.removeConsumerStatusListeners(this);
212             synchronized (unsentMessages) {
213                 Iterator JavaDoc i = sentMessages.iterator();
214                 while(i.hasNext()){
215                     MantaBusMessage msg = (MantaBusMessage) i.next();
216                     ackNotifier.removeTempListener(msg);
217                     ackNotifier=null;
218                 }
219
220             }
221         }
222         this.active = false;
223         purge();
224
225     }
226
227     /**
228      *
229      * @return
230      */

231     public List JavaDoc examineMessages(){
232         ArrayList JavaDoc list = new ArrayList JavaDoc();
233         ByteableList underlineCopy =new ByteableList();
234         underlineCopy.addAll(unsentMessages) ;
235
236         int size = underlineCopy.size();
237         for (int i = 0; i < size; i++) {
238             HashMap JavaDoc details = new HashMap JavaDoc();
239             MantaBusMessage msg =(MantaBusMessage) underlineCopy.get(i);
240             MantaMessage payload = (MantaMessage) msg.getPayload();
241             try {
242
243                 String JavaDoc id = payload.getJMSMessageID();
244                 details.put(QueueServiceMBean.MESSAGE_ID, id);
245                 if(payload instanceof MantaTextMessage){
246                     details.put(QueueServiceMBean.MESSAGE_TEXT,((MantaTextMessage)payload).getText());
247                 }
248                 HashMap JavaDoc properties = new HashMap JavaDoc();
249                 Enumeration JavaDoc propNames = payload.getPropertyNames();
250                 while(propNames.hasMoreElements()){
251                     String JavaDoc key = (String JavaDoc) propNames.nextElement();
252                     properties.put(key, payload.getStringProperty(key));
253                 }
254                 details.put(QueueServiceMBean.MESSAGE_PROPERTIES, properties);
255
256                 HashMap JavaDoc headers = new HashMap JavaDoc();
257                 headers.put("JMSCorrelationID",String.valueOf(payload.getJMSCorrelationID()));
258                 headers.put("JMSDestination",String.valueOf(payload.getJMSDestination()));
259                 headers.put("JMSReplyTo",String.valueOf(payload.getJMSReplyTo()));
260                 headers.put("JMSType",String.valueOf(payload.getJMSType()));
261                 headers.put("JMSDeliveryMode",String.valueOf(payload.getJMSDeliveryMode()) );
262                 headers.put("JMSExpiration",String.valueOf(payload.getJMSExpiration()));
263                 headers.put("JMSPriority",String.valueOf(payload.getJMSPriority()));
264                 details.put(QueueServiceMBean.MESSAGE_HEADERS, headers);
265             } catch (JMSException JavaDoc e) {
266                 // TODO Auto-generated catch block
267
e.printStackTrace();
268             }
269             list.add(details);
270         }
271         return list;
272     }
273
274 //Aviad should be only in the AbstractQueueService class
275
// //next method added by lital
276
// /**
277
// * returns the overFlowStrategy of the Queue
278
// * @return int 0 means THROW_EXCEPTION_STRATERGY,
279
// * 1 means RETURN_WITHOUT_ENQUEUE_STRATERGY
280
// * 2 means THROTTLING
281
// */
282
// public int getOverFlowStrategy(){
283
// return overflowStrategy;
284
// }
285

286 // next method added by lital
287
/**
288      * @return the max queue size
289      */

290     public long getMaxQueueSize(){
291         return maxQueueSize;
292     }
293
294     /**
295      * returns type SERVICE_TYPE_QUEUE
296      */

297     public byte getServiceType() {
298         return super.SERVICE_TYPE_QUEUE;
299     }
300
301
302
303
304     /**
305      * creates the queue and runs the dispatcher
306      * the dispatcher is the runner thread of this active queue
307      */

308     public synchronized void active(){
309         if(unsentMessages == null){
310             currentServiceConsumerLock = new Object JavaDoc();
311             boolean persistent = this.getPersistentMode() ==MantaAgentConstants.PERSISTENT;
312             if(!isTempQueue){
313                 savedMessages = new PersistentMap("queueService_"+this.getServiceName(),persistent,true);
314             }else{
315                 savedMessages = new HashMap JavaDoc();
316             }
317
318             unsentMessages = new PrioritizedList(MantaAgentConstants.TOTAL_PRIORITIES);
319             sentMessages = new LinkedList JavaDoc();
320             queueListeners = new LinkedList JavaDoc();
321             active =true;
322             ServiceActorControlCenter.addConsumerStatusListeners(this);
323             recover();
324             dispatch = new QueueDispatcher(this);
325             dispatch.start();
326         }
327
328     }//active
329

330     /**
331      * returns only when there are listeners in the queue
332      * @throws InterruptedException
333      */

334     public void waitForListeners() throws InterruptedException JavaDoc{
335         synchronized(queueListeners){
336             if(queueListeners.size()>0)
337                 return;
338             queueListeners.wait();
339             return;
340         }
341     }
342
343
344     /**
345      * register a remote agent to be a receiver of the queue
346      * @param agent the registering agent
347      * @param numberOfReceive the number after witch this receiver will be removed
348      */

349     protected void registerReceiverToQueue( ServiceConsumer consumer,long numberOfReceive ){
350
351         QueueReceiver receiver = new QueueReceiver(consumer , numberOfReceive);
352         if(numberOfReceive == 0){
353             doReceiveNoWait(receiver);
354
355         }else{
356             doHandleReceiver(receiver);
357         }
358         //log.fatal("ADD LISTENER");
359

360     }// registerReceiverToQueue
361

362     private void doHandleReceiver(QueueReceiver receiver){
363         synchronized(queueListeners){
364             queueListeners.add(receiver);
365             queueListeners.notifyAll();
366         }
367     }//doHandleReceiver
368

369     /**
370      * handles non blocking requests
371      * @param receiver
372      */

373     private void doReceiveNoWait(QueueReceiver receiver){
374         MantaBusMessage msg = null;
375         if(!pause){
376             synchronized(currentServiceConsumerLock){
377                 if(currentServiceConsumer == null ||currentServiceConsumer.getId().equals(receiver.getConsumer().getId())){
378                     currentServiceConsumer = receiver.getConsumer();
379                     synchronized(unsentMessages){
380                         int size = unsentMessages.size();
381                         for (int index = 0; index < size; index++) {
382                             msg =(MantaBusMessage) unsentMessages.get(index);
383                             if(checkValidMessage(msg ,receiver.getConsumer() )){
384                                 //we found a message for this consumer
385
unsentMessages.remove(index);
386                                 sentMessages.addFirst(msg);
387                                 break;
388                             }else{
389                                 msg = null;
390                             }
391                         }
392                     }
393                 }
394             }
395         }//if(!pause)
396

397
398         if(msg == null ){
399             msg = MantaBusMessage.getInstance();
400             msg.setPayload(null);
401             msg.setSource(this.queueMaster);
402             msg.setPriority(MantaAgentConstants.HIGH);
403             msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT);
404             msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
405             msg.addHeader(MantaBusMessageConsts.HEADER_NAME_IS_EMPTY,MantaBusMessageConsts.HEADER_VALUE_TRUE);
406
407         }
408         if(ackNotifier != null){
409             ackNotifier.setTempListener(msg, this);
410         }
411
412         receiver.receive(msg);
413     }//doReceiveNoBlock
414

415
416     /**
417      * removes all the receivers of the Consumer from a queue
418      * @param Consumer - the consumer ID
419      */

420     protected void unregisterConsumerToQueue(ServiceConsumer consumer ){
421         // we need to lockdown the queue for this
422
synchronized(unsentMessages){
423             synchronized(currentServiceConsumerLock){
424                 synchronized(queueListeners){
425                     Iterator JavaDoc receivers = queueListeners.iterator();
426
427                     while (receivers.hasNext()) {
428                         QueueReceiver rec = (QueueReceiver) receivers.next();
429                         if(rec.getConsumer().getId().equals(consumer.getId())){
430                             receivers.remove();
431                             //log.fatal("REMOVED LISTENER - unregister");
432
}
433                     }
434
435                     if(currentServiceConsumer != null &&
436                             currentServiceConsumer.getId().equals(consumer.getId())){
437                         rollback();
438                     }
439
440
441                 }
442             }
443         }
444     }// unregisterConsumerToQueue
445

446
447     public void unregisterReceiverToQueue(ServiceConsumer consumer) {
448         synchronized(currentServiceConsumerLock){
449             synchronized(queueListeners){
450                 Iterator JavaDoc receivers = queueListeners.iterator();
451
452                 while (receivers.hasNext()) {
453                     QueueReceiver rec = (QueueReceiver) receivers.next();
454                     if(rec.getConsumer().getId().equals(consumer.getId())){
455                         receivers.remove();
456                         //log.fatal("REMOVED LISTENER - unregister");
457
}//if
458
}//while
459

460             }
461         }
462
463     }//unregisterReceiverToQueue
464

465
466     private void rollback(){
467
468         // all the messges that are unacked by this reciver
469
// are returned to the metrix :)
470

471         // because the state of the sent messages is not clear,
472
// we need to make a copy of them
473
LinkedList JavaDoc tempCopy = new LinkedList JavaDoc();
474         Iterator JavaDoc sentIter = sentMessages.iterator();
475         while(sentIter.hasNext()){
476             try {
477                 MantaBusMessage rollBackMsg = (MantaBusMessage) sentIter.next();
478                 if(ackNotifier != null){
479                     ackNotifier.removeTempListener(rollBackMsg);
480                 }
481                 tempCopy.add(PostOffice.prepareMessageShallowCopy(rollBackMsg));
482             } catch (IOException JavaDoc e) {
483                 log.error("Rollback error",e);
484
485             }
486         }
487         unsentMessages.addAllToHead(tempCopy);
488         unsentMessages.notifyAll();
489         sentMessages.clear();
490
491         synchronized(currentServiceConsumerLock){
492             currentServiceConsumerLock.notifyAll();
493             currentServiceConsumer = null;
494
495         }
496     }
497
498     /**
499      * sends a copy of the queue to a remote agent
500      * @param agentName the receiving agent
501      */

502     protected void sendQueueCopy(ServiceConsumer consumer ) {
503         ByteableList underlineCopy =new ByteableList();
504         underlineCopy.addAll(unsentMessages) ;
505
506         int size = underlineCopy.size();
507         for (int i = 0; i < size; i++) {
508             MantaBusMessage msg =(MantaBusMessage) underlineCopy.get(i);
509             if(!checkValidMessage(msg ,consumer )){
510                 // remove the message if not needed
511
underlineCopy.remove(i);
512                 i--;
513                 size--;
514             }
515         }
516
517         QueueReceiver receiver = new QueueReceiver(consumer, 0);
518
519         MantaBusMessage msg = MantaBusMessage.getInstance();
520         msg.setPayload(underlineCopy);
521         msg.setPriority(MantaAgentConstants.HIGH);
522         msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT);
523         msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
524
525         MantaAddress address = new ServiceProducer(MantaAgent.getInstance().getAgentName(),this.getServiceName(),MantaService.SERVICE_TYPE_QUEUE);
526         msg.setSource(address);
527
528         receiver.receive(msg);
529
530     }//sendQueueCopy
531

532
533
534     /**
535      * @return Returns true if dispatcher thread has started.
536      */

537     public boolean isActive() {
538         return active;
539     }
540
541     /**
542      * @return Returns the queueMaster.
543      */

544     public QueueMaster getQueueMaster() {
545         return queueMaster;
546     }
547     /**
548      * @param queueMaster The queueMaster to set.
549      */

550     public void setQueueMaster(QueueMaster queueMaster) {
551         QueueMaster oldMaster = null;
552         synchronized(queueMasterLockObject){
553             if(this.queueMaster!=null){
554                 oldMaster = this.queueMaster;
555             }
556             this.queueMaster = queueMaster;
557             if(queueMaster != null){
558                 queueMasterLockObject.notifyAll();
559                 if(queueMaster.getAgentName().equals(MantaAgent.getInstance().getAgentName())){
560                     iAmQueueMaster = true;
561                 }
562                 subscriberManager.queueCoordinatorFound(queueMaster);
563             }else{
564                 if(iAmQueueMaster){
565                     iAmQueueMaster = false;
566                 }else if(isTempQueue){
567                     try {
568                         MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager().closeQueue(getServiceName());
569                     } catch (MantaException e) {
570                         log.error("Failed to close temp queue",e);
571                     }
572                 }
573             }
574             if (oldMaster != null) {
575                 // kill the po box of the old queue master
576
MantaAgent.getInstance().getSingletonRepository().getPostOffice().handleCoordinatorDown(oldMaster, this.queueMaster);
577             }
578
579         }
580     }
581
582     public boolean amIQueueMaster(){
583
584             return iAmQueueMaster;
585
586     }
587
588     public void waitForQueueMaster(long timeToWait) throws InterruptedException JavaDoc{
589         synchronized(queueMasterLockObject){
590             if(queueMaster == null){
591                 queueMasterLockObject.wait(timeToWait);
592
593             }// if
594
}// synchronized
595
}
596
597     public QueueSubscriberManager getSubscriberManager() {
598         return subscriberManager;
599     }
600
601
602     /**
603      * this method dequeues a message and links it with a proper receiver
604      */

605     public void doDequeue() throws InterruptedException JavaDoc {
606         // if this queue has receivers then dequeue and send to the receiver
607
waitForMessages();
608         QueueReceiver receiver = findEligibleReceiver();
609
610         if(receiver == null){
611             // check if the consumer that is locking the queue is
612
synchronized(unsentMessages){
613                 synchronized(currentServiceConsumerLock){
614                     if(currentServiceConsumer !=null){
615                         if(!ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)){
616                             rollback();
617                         }
618                     }
619                 }
620             }
621             Thread.sleep(100);
622             return;
623         }
624         boolean fed;
625         synchronized(unsentMessages){
626             fed = feedReceiver(receiver);
627         }
628         // return the receiver to the end of the list
629
if(receiver.getNumberOfReceive()>0){
630             synchronized(queueListeners){
631                 if(ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)){
632                     queueListeners.addLast(receiver);
633
634                 }else{
635                     // This listener is not currently advertised
636
// we need to wait a while and see if he will be re-advertised
637
Thread.sleep(1500);
638                     if(ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)){
639                         queueListeners.addLast(receiver);
640                     }//else{
641
// log.fatal("REMOVED LISTENER - consumer down");
642
//}
643

644                 }
645             }
646         }//else{
647
// log.fatal("REMOVED LISTENER - consumer done");
648
//}
649

650         if(fed){
651             synchronized(unsentMessages){
652                     if(sentMessages.size()>0){
653                         synchronized(currentServiceConsumerLock){
654                             currentServiceConsumer = receiver.getConsumer();
655                         }
656                     }
657             }
658
659         }else{
660             // we need to rest and wait
661
Thread.sleep(100);
662         }
663     }//doDequeue
664

665     private void checkNotPause() {
666         synchronized(pauseLockObject){
667             if(this.pause){
668                 try {
669                     pauseLockObject.wait();
670                 } catch (InterruptedException JavaDoc e) {
671                     // do nothing
672
}
673             }
674         }
675
676
677     }
678
679     private QueueReceiver findEligibleReceiver() throws InterruptedException JavaDoc{
680         waitForListeners();
681         checkNotPause();
682         synchronized(currentServiceConsumerLock){
683
684             synchronized(queueListeners){
685                 if(currentServiceConsumer ==null){
686                     try{
687                         return (QueueReceiver) queueListeners.removeFirst();
688                     }catch(Exception JavaDoc e){
689                         return null;
690                     }
691
692                 }
693
694                 Iterator JavaDoc receivers = queueListeners.iterator();
695                 while(receivers.hasNext()){
696                     QueueReceiver receiver = (QueueReceiver) receivers.next();
697                     if(receiver.getConsumer().getId().equals(currentServiceConsumer.getId())){
698                         receivers.remove();
699                         return receiver;
700                     }
701                 }
702             }//synchronized
703
}
704         return null;
705
706     }
707
708     private void waitForMessages() throws InterruptedException JavaDoc {
709         synchronized(unsentMessages){
710             if(unsentMessages.size()>0)
711                 return;
712             unsentMessages.wait();
713             return;
714         }
715
716     }
717
718
719     /**
720      * sends 1 message to the receiver while keeping order and selector
721      * @param receiver the reference object to the receiving mantaRay layer
722      * @return true if able to send 1 message to receiver
723      */

724     private boolean feedReceiver(QueueReceiver receiver) {
725         int size = unsentMessages.size();
726         MantaBusMessage msg = null;
727
728         for (int index = 0; index < size; index++) {
729             msg =(MantaBusMessage) unsentMessages.get(index);
730             // if old message
731
if(msg.getValidUntil() <SystemTime.gmtCurrentTimeMillis()){
732                 if(log.isInfoEnabled()){
733                     log.info("Not sending message "+msg +" msg.getValidUntil()=" +msg.getValidUntil()+ " SystemTime.gmtCurrentTimeMillis()=" +SystemTime.gmtCurrentTimeMillis()+".");
734                 }
735                 unsentMessages.remove(index);
736                 DeadLetterHandler.HandleDeadMessage(msg);
737                 return false;
738             }
739             if(checkValidMessage(msg ,receiver.getConsumer() )){
740                 //we found a message for this consumer
741
unsentMessages.remove(index);
742                 sentMessages.addFirst(msg);
743                 if(ackNotifier!=null)
744                     ackNotifier.setTempListener(msg, this);
745                 receiver.receive(msg);
746                 return true;
747             }
748         }
749
750         return false;
751     }
752
753     protected void enqueue(MantaBusMessage enqueuedMessage, boolean persistent) {
754         synchronized(unsentMessages){
755             if(!isTempQueue){
756                 ((PersistentMap)savedMessages).put(enqueuedMessage.getMessageId(),enqueuedMessage, persistent);
757             }else{
758                 savedMessages.put(enqueuedMessage.getMessageId(),enqueuedMessage);
759             }
760
761             unsentMessages.add(enqueuedMessage);
762             unsentMessages.notifyAll();
763         }
764     }
765
766     /**
767      * checks if size of queue is too big
768      * @return true if invoking method should return without enqueueing
769      */

770     public boolean isOverflow() {
771         int size = unsentMessages.size();
772         if(size < maxQueueSize ){
773             return false;
774         }
775         return true;
776
777     }
778
779     /**
780      * remove the acked message and release the current Receiver so others can share
781      */

782     public void gotAck(MantaBusMessage msg, MantaAddress source) {
783         synchronized(unsentMessages){
784             savedMessages.remove(msg.getMessageId());
785             sentMessages.remove(msg);
786             if(sentMessages.size() == 0){
787                 synchronized( currentServiceConsumerLock){
788                     if(currentServiceConsumer!= null ){
789                         currentServiceConsumerLock.notifyAll();
790                         currentServiceConsumer = null;
791                     }
792                 }
793             }
794         }
795     }//gotAck
796

797      /**
798       * re-queue the acked message and release the current receiver so
799       * others can share
800       */

801      public void gotAckReject(MantaBusMessage msg, MantaAddress source) {
802          synchronized(unsentMessages) {
803              msg.setDeliveryCount(msg.getDeliveryCount()+1);
804              unsentMessages.addToHead(msg);
805              sentMessages.remove(msg);
806              if (sentMessages.size() == 0) {
807                  synchronized (currentServiceConsumerLock) {
808                      if (currentServiceConsumer!= null) {
809                          currentServiceConsumerLock.notifyAll();
810                          currentServiceConsumer = null;
811                      }
812                  }
813              }
814          }
815      }
816
817     /**
818      * checks if we need to send or resend messages
819      */

820     public synchronized void recover() {
821
822         if(savedMessages.isEmpty()){
823             return;
824         }
825         ArrayList JavaDoc tempList = new ArrayList JavaDoc();
826         synchronized(savedMessages){
827             tempList.addAll(savedMessages.values());
828         }
829         //Aviad use another sort method - to sort by enqueue time and not send time
830
MantaBusMessageUtil.sortMessagesByEnqueueTime(tempList,VirtualQueuesManager.ENQUEUE_TIME);
831         //MantaBusMessageUtil.sortMessagesBySendTime(tempList);
832

833         int size = tempList.size();
834         for (int i = 0; i < size; i++) {
835             MantaBusMessage msg = (MantaBusMessage)tempList.get(i);
836             long now = SystemTime.gmtCurrentTimeMillis() ;
837             if( (msg.getValidUntil() <now ) ){
838                 if(log.isInfoEnabled()){
839                     log.info("Not sending message "+msg +" msg.getValidUntil()=" +msg.getValidUntil()+ " SystemTime.gmtCurrentTimeMillis()=" +SystemTime.gmtCurrentTimeMillis()+".");
840                 }
841                 savedMessages.remove(msg.getMessageId());
842                 DeadLetterHandler.HandleDeadMessage(msg);
843
844             }// end of dead old messge
845
else
846
847             // send the message
848
unsentMessages.add(msg);
849
850         }//for
851

852     }//resendIfNeeded
853

854
855     public void handleConsumerUp(ServiceConsumer consumer) {
856         // nothing to do
857

858     }
859
860
861     public void handleConsumerDown(ServiceConsumer consumer) {
862         unregisterConsumerToQueue(consumer);
863
864     }
865
866
867
868     public int getUnsentCount() {
869         return unsentMessages.size();
870     }
871
872
873
874     public String JavaDoc toString(){
875         StringBuffer JavaDoc buff = new StringBuffer JavaDoc();
876         buff.append(" service{");
877         buff.append(" service name=");
878         buff.append(logicalName);
879         buff.append(" serviceType=");
880         buff.append(getServiceType());
881         buff.append(" consumers=");
882         buff.append(consumers);
883         buff.append(" producers=");
884         buff.append(producers);
885         buff.append(" coordinator=");
886         buff.append(queueMaster);
887         buff.append(" persistentMode=");
888         buff.append(super.getPersistentMode());
889         buff.append(" }");
890         return buff.toString();
891     }
892
893
894     public boolean isTempQueue() {
895         return isTempQueue;
896     }
897 }
898
Popular Tags