KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > services > queues > ForeignQueueService


1 /*
2  * Copyright Coridan Inc.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License as
6  * published by the Free Software Foundation; either version 2 of the
7  * License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  * USA
18  *
19  * Support at:
20  * http://sourceforge.net/projects/mantaray/
21  * Or
22  * support@coridan.com
23  */

24 package org.mr.kernel.services.queues;
25
26 import java.io.IOException JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.Enumeration JavaDoc;
29 import java.util.HashMap JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.LinkedList JavaDoc;
32 import java.util.List JavaDoc;
33 import java.util.Map JavaDoc;
34
35 import javax.jms.JMSException JavaDoc;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.mr.MantaAgent;
40 import org.mr.MantaAgentConstants;
41 import org.mr.MantaException;
42 import org.mr.api.jms.MantaConnection;
43 import org.mr.api.jms.MantaMessage;
44 import org.mr.api.jms.MantaTextMessage;
45 import org.mr.core.net.MantaAddress;
46 import org.mr.core.persistent.PersistentMap;
47 import org.mr.core.protocol.MantaBusMessage;
48 import org.mr.core.protocol.MantaBusMessageConsts;
49 import org.mr.core.protocol.MantaBusMessageUtil;
50 import org.mr.core.util.PrioritizedList;
51 import org.mr.core.util.SystemTime;
52 import org.mr.core.util.byteable.ByteableList;
53 import org.mr.kernel.delivery.DeliveryAckListener;
54 import org.mr.kernel.delivery.DeliveryAckNotifier;
55 import org.mr.kernel.delivery.PostOffice;
56 import org.mr.kernel.services.MantaService;
57 import org.mr.kernel.services.ServiceActorControlCenter;
58 import org.mr.kernel.services.ServiceActorStatusListener;
59 import org.mr.kernel.services.ServiceConsumer;
60 import org.mr.kernel.services.ServiceProducer;
61 import org.mr.kernel.services.DeadLetterHandler;
62
63 /**
64  * QueueService is a manta service is a FIFO service with 1 producer and multiple consumers (see jms spec for for info)
65  *
66  * @author Amir Shevat
67  * @version 2.0 (1.0 up untill MantaRay 1.6)
68  * @since Feb 1, 2004
69  */

70 /*
71  * TODO
72  * + remove savedMessages: no persistency here
73  * + replace unsentMessages operations with impl operations (currently at
74  * sendQueueCopy)
75  * + find another lock object instead of unsentMessages
76  * + impl factory
77  * - waitForMessages() - handle the case where a foreign messages is queued.
78  * + create interface to unite ForeignQueueService and QueueService
79  * - create from configuration
80  */

81
82 class ForeignQueueService extends AbstractQueueService implements DeliveryAckListener, ServiceActorStatusListener, QueueServiceMBean {
83     // two strategies in case of overflow
84
// public static final int THROW_EXCEPTION_STRATERGY =0;
85
// public static final int RETURN_WITHOUT_ENQUEUE_STRATERGY =1;
86
// public static final int THROTTLE_STRATERGY =2;
87
// time to delay throttled producer
88
public static final int throttleDelay = 5;
89
90     protected Log log;
91 // private PrioritizedList unsentMessages;
92
private LinkedList JavaDoc sentMessages;
93 // private Map savedMessages;
94
private QueueSubscriberManager subscriberManager;
95     private boolean active;
96     private QueueDispatcher dispatch;
97 // a list of receivers waiting on queue data
98
private LinkedList JavaDoc queueListeners;
99     /*
100       * The reciver that is currently has unacked messages from this queue.
101       * The queue will send messages only to this QueueReceiver untill all
102       * messages sent to this reciver are acked and then change the currentQueueReceiver
103       */

104     private ServiceConsumer currentServiceConsumer = null;
105     private Object JavaDoc currentServiceConsumerLock = null;
106
107     private QueueMaster queueMaster;
108
109     private boolean iAmQueueMaster = false;
110
111     private Object JavaDoc queueMasterLockObject = new Object JavaDoc();
112
113     private DeliveryAckNotifier ackNotifier;
114
115     private static long maxQueueSize = Long.MAX_VALUE;
116     // Aviad - should be only in AbstractQueueService
117
//int overflowStrategy;
118
// if true messages should not be dequeue form queue
119
private boolean pause;
120     private Object JavaDoc pauseLockObject = new Object JavaDoc();
121     private boolean isTempQueue = false;
122     private ForeignQueueImpl impl;
123
124     /**
125      * a new queue with a given name
126      *
127      * @param serviceName the name of the queue
128      */

129     public ForeignQueueService(String JavaDoc serviceName, ForeignQueueImpl impl) {
130         super(serviceName);
131         this.impl = impl;
132         log = LogFactory.getLog("QueueService");
133         isTempQueue = getServiceName().startsWith(MantaConnection.TMP_DESTINATION_PREFIX);
134         subscriberManager = new QueueSubscriberManager(this);
135         ackNotifier = MantaAgent.getInstance().getSingletonRepository().getDeliveryAckNotifier();
136         maxQueueSize = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getLongProperty("jms.max_queue_size", 1000000);
137         overflowStrategy = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getIntProperty("jms.queue_overflow_strategy", 2);
138         if (!isTempQueue) {
139 // try {
140
// MantaAgent.getInstance().getSingletonRepository().getMantaJMXManagment().addManagedObject(this, "MantaRay:queue="+this.getServiceName());
141
// } catch (MantaException e) {
142
// if(log.isErrorEnabled()){
143
// log.error("Could not create the JMX MBean 'MantaRay:queue="+this.getServiceName()+"'.",e);
144
// }
145
// }
146
}
147
148
149     }
150
151     /**
152      * @return true if the queue is paused at the moment, false otherwise
153      */

154     public boolean isPaused() {
155         return this.pause;
156     }
157
158     /**
159      * Paused the queue. Pausing means producers will still be able to produce
160      * messages, but those messages will be held inside the queue and not sent
161      * to the consumers until the queue is resumed again.
162      */

163     public void pause() {
164         synchronized (pauseLockObject) {
165             this.pause = true;
166         }
167     }
168
169     /**
170      * resumes a queue that was paused.
171      */

172     public void resume() {
173         synchronized (pauseLockObject) {
174             this.pause = false;
175             pauseLockObject.notifyAll();
176         }
177     }
178
179     /**
180      * Purges all the messages currently held by the queue. This method
181      * can be operated only after the queue has been paused.
182      */

183     public void purge() {
184         // do nothing: foreign messages are left intact
185
// if(unsentMessages != null)
186
// this.unsentMessages.clear();
187
// if(sentMessages != null)
188
// this.sentMessages.clear();
189
// if(savedMessages != null)
190
// this.savedMessages.clear();
191
}
192
193     /**
194      * closes the queue and stops the reactor thead if it works.
195      *
196      * @throws MantaException
197      */

198     public void close() throws MantaException {
199         if (dispatch != null) {
200             dispatch.stopIt();
201         }
202         if (iAmQueueMaster) {
203             MantaAgent.getInstance().recallService(this.getQueueMaster());
204             ServiceActorControlCenter.removeConsumerStatusListeners(this);
205             // TODO synchronize
206
synchronized (this.impl) {
207                 Iterator JavaDoc i = sentMessages.iterator();
208                 while (i.hasNext()) {
209                     MantaBusMessage msg = (MantaBusMessage) i.next();
210                     ackNotifier.removeTempListener(msg);
211                     ackNotifier = null;
212                 }
213
214             }
215         }
216         this.active = false;
217         purge();
218
219     }
220
221     /**
222      * @return
223      */

224     public List JavaDoc examineMessages() {
225         System.err.println("ForeignQueueService: not implemented: " +
226                 "examineMessages");
227 // ArrayList list = new ArrayList();
228
// ByteableList underlineCopy =new ByteableList();
229
// underlineCopy.addAll(unsentMessages) ;
230

231 // int size = underlineCopy.size();
232
// for (int i = 0; i < size; i++) {
233
// HashMap details = new HashMap();
234
// MantaBusMessage msg =(MantaBusMessage) underlineCopy.get(i);
235
// MantaMessage payload = (MantaMessage) msg.getPayload();
236
// try {
237

238 // String id = payload.getJMSMessageID();
239
// details.put(QueueServiceMBean.MESSAGE_ID, id);
240
// if(payload instanceof MantaTextMessage){
241
// details.put(QueueServiceMBean.MESSAGE_TEXT,((MantaTextMessage)payload).getText());
242
// }
243
// HashMap properties = new HashMap();
244
// Enumeration propNames = payload.getPropertyNames();
245
// while(propNames.hasMoreElements()){
246
// String key = (String) propNames.nextElement();
247
// properties.put(key, payload.getStringProperty(key));
248
// }
249
// details.put(QueueServiceMBean.MESSAGE_PROPERTIES, properties);
250

251 // HashMap headers = new HashMap();
252
// headers.put("JMSCorrelationID",String.valueOf(payload.getJMSCorrelationID()));
253
// headers.put("JMSDestination",String.valueOf(payload.getJMSDestination()));
254
// headers.put("JMSReplyTo",String.valueOf(payload.getJMSReplyTo()));
255
// headers.put("JMSType",String.valueOf(payload.getJMSType()));
256
// headers.put("JMSDeliveryMode",String.valueOf(payload.getJMSDeliveryMode()) );
257
// headers.put("JMSExpiration",String.valueOf(payload.getJMSExpiration()));
258
// headers.put("JMSPriority",String.valueOf(payload.getJMSPriority()));
259
// details.put(QueueServiceMBean.MESSAGE_HEADERS, headers);
260
// } catch (JMSException e) {
261
// // TODO Auto-generated catch block
262
// e.printStackTrace();
263
// }
264
// list.add(details);
265
// }
266
// return list;
267
return new ArrayList JavaDoc();
268     }
269
270     //next method added by lital
271

272 // Should be only in the AbstractQueueService
273
// /**
274
// * returns the overFlowStrategy of the Queue
275
// *
276
// * @return int 0 means THROW_EXCEPTION_STRATERGY,
277
// * 1 means RETURN_WITHOUT_ENQUEUE_STRATERGY
278
// * 2 means THROTTLING
279
// */
280
// public int getOverFlowStrategy() {
281
// return overflowStrategy;
282
// }
283

284 // next method added by lital
285

286     /**
287      * @return the max queue size
288      */

289     public long getMaxQueueSize() {
290         return maxQueueSize;
291     }
292
293     /**
294      * returns type SERVICE_TYPE_QUEUE
295      */

296     public byte getServiceType() {
297         return super.SERVICE_TYPE_QUEUE;
298     }
299
300
301     /**
302      * creates the queue and runs the dispatcher
303      * the dispatcher is the runner thread of this active queue
304      */

305     public synchronized void active() {
306 // if(unsentMessages == null){
307
if (!this.active) {
308             currentServiceConsumerLock = new Object JavaDoc();
309             boolean persistent = this.getPersistentMode() == MantaAgentConstants.PERSISTENT;
310 // if(!isTempQueue){
311
// savedMessages = new PersistentMap("queueService_"+this.getServiceName(),persistent,true);
312
// }else{
313
// savedMessages = new HashMap();
314
// }
315

316 // unsentMessages = new PrioritizedList(MantaAgentConstants.TOTAL_PRIORITIES);
317
this.impl.init();
318             sentMessages = new LinkedList JavaDoc();
319             queueListeners = new LinkedList JavaDoc();
320             active = true;
321             ServiceActorControlCenter.addConsumerStatusListeners(this);
322             recover();
323             dispatch = new QueueDispatcher(this);
324             dispatch.start();
325         }
326
327     }//active
328

329     /**
330      * returns only when there are listeners in the queue
331      *
332      * @throws InterruptedException
333      */

334     public void waitForListeners() throws InterruptedException JavaDoc {
335         synchronized (queueListeners) {
336             if (queueListeners.size() > 0)
337                 return;
338             queueListeners.wait();
339             return;
340         }
341     }
342
343
344     /**
345      * register a remote agent to be a receiver of the queue
346      *
347      * @param agent the registering agent
348      * @param numberOfReceive the number after witch this receiver will be removed
349      */

350     protected void registerReceiverToQueue(ServiceConsumer consumer, long numberOfReceive) {
351
352         QueueReceiver receiver = new QueueReceiver(consumer, numberOfReceive);
353         if (numberOfReceive == 0) {
354             doReceiveNoWait(receiver);
355
356         } else {
357             doHandleReceiver(receiver);
358         }
359         //log.fatal("ADD LISTENER");
360

361     }// registerReceiverToQueue
362

363     private void doHandleReceiver(QueueReceiver receiver) {
364         synchronized (queueListeners) {
365             queueListeners.add(receiver);
366             queueListeners.notifyAll();
367         }
368     }//doHandleReceiver
369

370     /**
371      * handles non blocking requests
372      *
373      * @param receiver
374      */

375     private void doReceiveNoWait(QueueReceiver receiver) {
376         MantaBusMessage msg = null;
377         if (!pause) {
378             synchronized (currentServiceConsumerLock) {
379                 if (currentServiceConsumer == null || currentServiceConsumer.getId().equals(receiver.getConsumer().getId())) {
380                     currentServiceConsumer = receiver.getConsumer();
381                     // TODO synchronized
382
synchronized (this.impl) {
383                         msg = this.impl.receiveNoWait();
384                         msg.setSource(this.queueMaster);
385                         System.out.println("doReceiveNoWait: master = " +
386                                 queueMaster);
387                         // TODO check for valid like the normal queue
388
// int size = unsentMessages.size();
389
// for (int index = 0; index < size; index++) {
390
// msg =(MantaBusMessage) unsentMessages.get(index);
391
// if(checkValidMessage(msg ,receiver.getConsumer() )){
392
// //we found a message for this consumer
393
// unsentMessages.remove(index);
394
// sentMessages.addFirst(msg);
395
// break;
396
// }else{
397
// msg = null;
398
// }
399
// }
400
}
401                 }
402             }
403         }//if(!pause)
404

405
406         if (msg == null) {
407             msg = MantaBusMessage.getInstance();
408             msg.setPayload(null);
409             msg.setSource(this.queueMaster);
410             msg.setPriority(MantaAgentConstants.HIGH);
411             msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT);
412             msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
413             msg.addHeader(MantaBusMessageConsts.HEADER_NAME_IS_EMPTY, MantaBusMessageConsts.HEADER_VALUE_TRUE);
414
415         }
416         if (ackNotifier != null) {
417             ackNotifier.setTempListener(msg, this);
418         }
419
420         receiver.receive(msg);
421     }//doReceiveNoBlock
422

423
424     /**
425      * removes all the receivers of the Consumer from a queue
426      *
427      * @param Consumer - the consumer ID
428      */

429     protected void unregisterConsumerToQueue(ServiceConsumer consumer) {
430         // we need to lockdown the queue for this
431
// TODO synchronized
432
synchronized (this.impl) {
433             synchronized (currentServiceConsumerLock) {
434                 synchronized (queueListeners) {
435                     Iterator JavaDoc receivers = queueListeners.iterator();
436
437                     while (receivers.hasNext()) {
438                         QueueReceiver rec = (QueueReceiver) receivers.next();
439                         if (rec.getConsumer().getId().equals(consumer.getId())) {
440                             receivers.remove();
441                             //log.fatal("REMOVED LISTENER - unregister");
442
}
443                     }
444
445                     if (currentServiceConsumer != null &&
446                             currentServiceConsumer.getId().equals(consumer.getId())) {
447                         rollback();
448                     }
449
450
451                 }
452             }
453         }
454     }// unregisterConsumerToQueue
455

456
457     public void unregisterReceiverToQueue(ServiceConsumer consumer) {
458         synchronized (currentServiceConsumerLock) {
459             synchronized (queueListeners) {
460                 Iterator JavaDoc receivers = queueListeners.iterator();
461
462                 while (receivers.hasNext()) {
463                     QueueReceiver rec = (QueueReceiver) receivers.next();
464                     if (rec.getConsumer().getId().equals(consumer.getId())) {
465                         receivers.remove();
466                         //log.fatal("REMOVED LISTENER - unregister");
467
}//if
468
}//while
469

470             }
471         }
472
473     }//unregisterReceiverToQueue
474

475
476     private void rollback() {
477
478         // all the messges that are unacked by this reciver
479
// are returned to the metrix :)
480

481         // because the state of the sent messages is not clear,
482
// we need to make a copy of them
483
Iterator JavaDoc sentIter = sentMessages.iterator();
484         boolean shouldNotify = false;
485         while (sentIter.hasNext()) {
486             try {
487                 MantaBusMessage rollBackMsg = (MantaBusMessage) sentIter.next();
488                 if (ackNotifier != null) {
489                     ackNotifier.removeTempListener(rollBackMsg);
490                 }
491                 this.impl.sendToHead(PostOffice.
492                         prepareMessageShallowCopy(rollBackMsg));
493                 shouldNotify = true;
494             } catch (IOException JavaDoc e) {
495                 log.error("Rollback error", e);
496
497             }
498         }
499         // TODO synchronized
500
if (shouldNotify) {
501             this.impl.notifyAll();
502         }
503         sentMessages.clear();
504
505         synchronized (currentServiceConsumerLock) {
506             currentServiceConsumerLock.notifyAll();
507             currentServiceConsumer = null;
508
509         }
510     }
511
512     /**
513      * sends a copy of the queue to a remote agent
514      *
515      * @param agentName the receiving agent
516      */

517     protected void sendQueueCopy(ServiceConsumer consumer) {
518         ByteableList underlineCopy = new ByteableList();
519         underlineCopy.addAll(this.impl.getCopy());
520
521         int size = underlineCopy.size();
522         for (int i = 0; i < size; i++) {
523             MantaBusMessage msg = (MantaBusMessage) underlineCopy.get(i);
524             if (!checkValidMessage(msg, consumer)) {
525                 // remove the message if not needed
526
underlineCopy.remove(i);
527                 i--;
528                 size--;
529             }
530         }
531
532         QueueReceiver receiver = new QueueReceiver(consumer, 0);
533
534         MantaBusMessage msg = MantaBusMessage.getInstance();
535         msg.setPayload(underlineCopy);
536         msg.setPriority(MantaAgentConstants.HIGH);
537         msg.setDeliveryMode(MantaAgentConstants.NON_PERSISTENT);
538         msg.setMessageType(MantaBusMessageConsts.MESSAGE_TYPE_CLIENT);
539
540         MantaAddress address = new ServiceProducer(MantaAgent.getInstance().getAgentName(), this.getServiceName(), MantaService.SERVICE_TYPE_QUEUE);
541         msg.setSource(address);
542
543         receiver.receive(msg);
544
545     }//sendQueueCopy
546

547
548     /**
549      * @return Returns true if dispatcher thread has started.
550      */

551     public boolean isActive() {
552         return active;
553     }
554
555     /**
556      * @return Returns the queueMaster.
557      */

558     public QueueMaster getQueueMaster() {
559         return queueMaster;
560     }
561
562     /**
563      * @param queueMaster The queueMaster to set.
564      */

565     public void setQueueMaster(QueueMaster queueMaster) {
566         synchronized (queueMasterLockObject) {
567             if (this.queueMaster != null) {
568                 // kill the po box of the old queue master
569
MantaAgent.getInstance().getSingletonRepository().getPostOffice().handleCoordinatorDown(this.queueMaster, queueMaster);
570             }
571             this.queueMaster = queueMaster;
572             if (queueMaster != null) {
573                 queueMasterLockObject.notifyAll();
574                 if (queueMaster.getAgentName().equals(MantaAgent.getInstance().getAgentName())) {
575                     iAmQueueMaster = true;
576                 }
577                 subscriberManager.queueCoordinatorFound(queueMaster);
578             } else {
579                 if (iAmQueueMaster) {
580                     iAmQueueMaster = false;
581                 } else if (isTempQueue) {
582                     try {
583                         MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager().closeQueue(getServiceName());
584                     } catch (MantaException e) {
585                         log.error("Failed to close temp queue", e);
586                     }
587                 }
588             }
589
590         }
591     }
592
593     public boolean amIQueueMaster() {
594
595         return iAmQueueMaster;
596
597     }
598
599     public void waitForQueueMaster(long timeToWait) throws InterruptedException JavaDoc {
600         synchronized (queueMasterLockObject) {
601             if (queueMaster == null) {
602                 queueMasterLockObject.wait(timeToWait);
603
604             }// if
605
}// synchronized
606
}
607
608     public QueueSubscriberManager getSubscriberManager() {
609         return subscriberManager;
610     }
611
612
613     /**
614      * this method dequeues a message and links it with a proper receiver
615      */

616     public void doDequeue() throws InterruptedException JavaDoc {
617         // if this queue has receivers then dequeue and send to the receiver
618
waitForMessages();
619         QueueReceiver receiver = findEligibleReceiver();
620
621         if (receiver == null) {
622             // check if the consumer that is locking the queue is
623
synchronized (this.impl) {
624                 synchronized (currentServiceConsumerLock) {
625                     if (currentServiceConsumer != null) {
626                         if (!ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)) {
627                             rollback();
628                         }
629                     }
630                 }
631             }
632             Thread.sleep(100);
633             return;
634         }
635         boolean fed;
636         synchronized (this.impl) {
637             fed = feedReceiver(receiver);
638         }
639         // return the receiver to the end of the list
640
if (receiver.getNumberOfReceive() > 0) {
641             synchronized (queueListeners) {
642                 if (ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)) {
643                     queueListeners.addLast(receiver);
644
645                 } else {
646                     // This listener is not currently advertised
647
// we need to wait a while and see if he will be re-advertised
648
Thread.sleep(1500);
649                     if (ServiceActorControlCenter.isConsumerUp(currentServiceConsumer)) {
650                         queueListeners.addLast(receiver);
651                     }//else{
652
// log.fatal("REMOVED LISTENER - consumer down");
653
//}
654

655                 }
656             }
657         }//else{
658
// log.fatal("REMOVED LISTENER - consumer done");
659
//}
660

661         if (fed) {
662             synchronized (this.impl) {
663                 if (sentMessages.size() > 0) {
664                     synchronized (currentServiceConsumerLock) {
665                         currentServiceConsumer = receiver.getConsumer();
666                     }
667                 }
668             }
669
670         } else {
671             // we need to rest and wait
672
Thread.sleep(100);
673         }
674     }//doDequeue
675

676     private void checkNotPause() {
677         synchronized (pauseLockObject) {
678             if (this.pause) {
679                 try {
680                     pauseLockObject.wait();
681                 } catch (InterruptedException JavaDoc e) {
682                     // do nothing
683
}
684             }
685         }
686
687
688     }
689
690     private QueueReceiver findEligibleReceiver() throws InterruptedException JavaDoc {
691         waitForListeners();
692         checkNotPause();
693         synchronized (currentServiceConsumerLock) {
694
695             synchronized (queueListeners) {
696                 if (currentServiceConsumer == null) {
697                     try {
698                         return (QueueReceiver) queueListeners.removeFirst();
699                     } catch (Exception JavaDoc e) {
700                         return null;
701                     }
702
703                 }
704
705                 Iterator JavaDoc receivers = queueListeners.iterator();
706                 while (receivers.hasNext()) {
707                     QueueReceiver receiver = (QueueReceiver) receivers.next();
708                     if (receiver.getConsumer().getId().equals(currentServiceConsumer.getId())) {
709                         receivers.remove();
710                         return receiver;
711                     }
712                 }
713             }//synchronized
714
}
715         return null;
716
717     }
718
719     private void waitForMessages() throws InterruptedException JavaDoc {
720         this.impl.waitForMessages();
721     }
722
723
724     /**
725      * sends 1 message to the receiver while keeping order. NO
726      * SELECTOR - can't get a message from the middle of a foreign
727      * service, that's why.
728      *
729      * @param receiver the reference object to the receiving mantaRay layer
730      * @return true if able to send 1 message to receiver
731      */

732     private boolean feedReceiver(QueueReceiver receiver) {
733         MantaBusMessage msg = null;
734
735         msg = this.impl.receiveNoWait();
736         msg.setSource(this.queueMaster);
737         if (msg != null) {
738             if (msg.getValidUntil() < SystemTime.gmtCurrentTimeMillis()) {
739                 if (this.log.isInfoEnabled()) {
740                     log.info("Not sending message " + msg +
741                             " msg.getValidUntil()=" + msg.getValidUntil() +
742                             " SystemTime.gmtCurrentTimeMillis()=" +
743                             SystemTime.gmtCurrentTimeMillis() + ".");
744                 }
745                 DeadLetterHandler.HandleDeadMessage(msg);
746                 return false;
747             } else {
748                 sentMessages.addFirst(msg);
749                 if (ackNotifier != null) {
750                     ackNotifier.setTempListener(msg, this);
751                 }
752                 receiver.receive(msg);
753                 return true;
754             }
755         }
756
757         return false;
758     }
759
760     protected void enqueue(MantaBusMessage enqueuedMessage, boolean persistent) {
761         synchronized (this.impl) {
762             this.impl.send(enqueuedMessage);
763             this.impl.notifyAll();
764         }
765     }
766
767     /**
768      * checks if size of queue is too big - always false, not our problem
769      *
770      * @return true if invoking method should return without enqueueing
771      */

772     public boolean isOverflow() {
773         return false;
774     }
775
776     /**
777      * remove the acked message and release the current Receiver so others can share
778      */

779     public void gotAck(MantaBusMessage msg, MantaAddress source) {
780         synchronized (this.impl) {
781             sentMessages.remove(msg);
782             if (sentMessages.size() == 0) {
783                 synchronized (currentServiceConsumerLock) {
784                     if (currentServiceConsumer != null) {
785                         currentServiceConsumerLock.notifyAll();
786                         currentServiceConsumer = null;
787                     }
788                 }
789             }
790         }
791     }//gotAck
792

793     /**
794      * re-queue the acked message and release the current receiver so
795      * others can share
796      */

797     public void gotAckReject(MantaBusMessage msg, MantaAddress source) {
798         synchronized (this.impl) {
799             msg.setDeliveryCount(msg.getDeliveryCount() + 1);
800             this.impl.sendToHead(msg);
801             sentMessages.remove(msg);
802             if (sentMessages.size() == 0) {
803                 synchronized (currentServiceConsumerLock) {
804                     if (currentServiceConsumer != null) {
805                         currentServiceConsumerLock.notifyAll();
806                         currentServiceConsumer = null;
807                     }
808                 }
809             }
810         }
811     }
812
813     /**
814      * checks if we need to send or resend messages
815      */

816     public synchronized void recover() {
817         // nothing to do here
818
}
819
820
821     public void handleConsumerUp(ServiceConsumer consumer) {
822         // nothing to do
823
}
824
825
826     public void handleConsumerDown(ServiceConsumer consumer) {
827         unregisterConsumerToQueue(consumer);
828     }
829
830
831     public int getUnsentCount() {
832         // TODO return correct figure
833
return 0;
834     }
835
836
837     public String JavaDoc toString() {
838         StringBuffer JavaDoc buff = new StringBuffer JavaDoc();
839         buff.append(" service{");
840         buff.append(" service name=");
841         buff.append(logicalName);
842         buff.append(" serviceType=");
843         buff.append(getServiceType());
844         buff.append(" consumers=");
845         buff.append(consumers);
846         buff.append(" producers=");
847         buff.append(producers);
848         buff.append(" coordinator=");
849         buff.append(queueMaster);
850         buff.append(" persistentMode=");
851         buff.append(super.getPersistentMode());
852         buff.append(" }");
853         return buff.toString();
854     }
855
856
857     public boolean isTempQueue() {
858         return isTempQueue;
859     }
860 }
861
Popular Tags