KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > Queue


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import java.io.IOException JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.Iterator JavaDoc;
23 import java.util.LinkedList JavaDoc;
24 import java.util.List JavaDoc;
25 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
26
27 import javax.jms.InvalidSelectorException JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29
30 import org.apache.activemq.broker.ConnectionContext;
31 import org.apache.activemq.broker.ProducerBrokerExchange;
32 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
33 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
34 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
35 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
36 import org.apache.activemq.broker.region.group.MessageGroupMap;
37 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
38 import org.apache.activemq.broker.region.group.MessageGroupSet;
39 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
40 import org.apache.activemq.broker.region.policy.DispatchPolicy;
41 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
42 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
43 import org.apache.activemq.command.ActiveMQDestination;
44 import org.apache.activemq.command.ConsumerId;
45 import org.apache.activemq.command.ExceptionResponse;
46 import org.apache.activemq.command.Message;
47 import org.apache.activemq.command.MessageAck;
48 import org.apache.activemq.command.MessageId;
49 import org.apache.activemq.command.ProducerAck;
50 import org.apache.activemq.command.Response;
51 import org.apache.activemq.filter.BooleanExpression;
52 import org.apache.activemq.filter.MessageEvaluationContext;
53 import org.apache.activemq.kaha.Store;
54 import org.apache.activemq.memory.UsageManager;
55 import org.apache.activemq.selector.SelectorParser;
56 import org.apache.activemq.store.MessageRecoveryListener;
57 import org.apache.activemq.store.MessageStore;
58 import org.apache.activemq.thread.Task;
59 import org.apache.activemq.thread.TaskRunner;
60 import org.apache.activemq.thread.TaskRunnerFactory;
61 import org.apache.activemq.thread.Valve;
62 import org.apache.activemq.transaction.Synchronization;
63 import org.apache.activemq.util.BrokerSupport;
64 import org.apache.commons.logging.Log;
65 import org.apache.commons.logging.LogFactory;
66
67 /**
68  * The Queue is a List of MessageEntry objects that are dispatched to matching
69  * subscriptions.
70  *
71  * @version $Revision: 1.28 $
72  */

73 public class Queue implements Destination, Task {
74
75     private final Log log;
76
77     private final ActiveMQDestination destination;
78     private final List JavaDoc consumers = new CopyOnWriteArrayList JavaDoc();
79     private final Valve dispatchValve = new Valve(true);
80     private final UsageManager usageManager;
81     private final DestinationStatistics destinationStatistics = new DestinationStatistics();
82     private PendingMessageCursor messages;
83     private final LinkedList JavaDoc pagedInMessages = new LinkedList JavaDoc();
84     private LockOwner exclusiveOwner;
85     private MessageGroupMap messageGroupOwners;
86
87     private int garbageSize = 0;
88     private int garbageSizeBeforeCollection = 1000;
89     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
90     private final MessageStore store;
91     private int highestSubscriptionPriority = Integer.MIN_VALUE;
92     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
93     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
94     private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
95     private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
96     private final Object JavaDoc exclusiveLockMutex = new Object JavaDoc();
97     private final Object JavaDoc doDispatchMutex = new Object JavaDoc();
98     private TaskRunner taskRunner;
99     private boolean started = false;
100     
101     public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
102             TaskRunnerFactory taskFactory, Store tmpStore) throws Exception JavaDoc {
103         this.destination = destination;
104         this.usageManager = new UsageManager(memoryManager,destination.toString());
105         this.usageManager.setUsagePortion(1.0f);
106         this.store = store;
107         if(destination.isTemporary()){
108             this.messages=new VMPendingMessageCursor();
109         }else{
110             this.messages=new StoreQueueCursor(this,tmpStore);
111         }
112         
113         this.taskRunner = taskFactory.createTaskRunner(this, "Queue "+destination.getPhysicalName());
114
115         // Let the store know what usage manager we are using so that he can
116
// flush messages to disk
117
// when usage gets high.
118
if (store != null) {
119             store.setUsageManager(usageManager);
120         }
121
122         //let's copy the enabled property from the parent DestinationStatistics
123
this.destinationStatistics.setEnabled(parentStats.isEnabled());
124         destinationStatistics.setParent(parentStats);
125         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
126
127         
128     }
129     
130     public void initialize() throws Exception JavaDoc{
131         if(store!=null){
132             // Restore the persistent messages.
133
messages.setUsageManager(getUsageManager());
134             if(messages.isRecoveryRequired()){
135                 store.recover(new MessageRecoveryListener(){
136
137                     public void recoverMessage(Message message){
138                         // Message could have expired while it was being loaded..
139
if(message.isExpired()){
140                             // TODO remove from store
141
return;
142                         }
143                         message.setRegionDestination(Queue.this);
144                         synchronized(messages){
145                             try{
146                                 messages.addMessageLast(message);
147                             }catch(Exception JavaDoc e){
148                                 log.fatal("Failed to add message to cursor",e);
149                             }
150                         }
151                         destinationStatistics.getMessages().increment();
152                     }
153
154                     public void recoverMessageReference(MessageId messageReference) throws Exception JavaDoc{
155                         throw new RuntimeException JavaDoc("Should not be called.");
156                     }
157
158                     public void finished(){
159                     }
160
161                     public boolean hasSpace(){
162                         return true;
163                     }
164                 });
165             }
166         }
167     }
168
169     /**
170      * Lock a node
171      *
172      * @param node
173      * @param lockOwner
174      * @return true if can be locked
175      * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference,
176      * org.apache.activemq.broker.region.LockOwner)
177      */

178     public boolean lock(MessageReference node,LockOwner lockOwner){
179         synchronized(exclusiveLockMutex){
180             if(exclusiveOwner==lockOwner){
181                 return true;
182             }
183             if(exclusiveOwner!=null){
184                 return false;
185             }
186             if(lockOwner.getLockPriority()<highestSubscriptionPriority){
187                 return false;
188             }
189             if(lockOwner.isLockExclusive()){
190                 exclusiveOwner=lockOwner;
191             }
192         }
193         return true;
194     }
195
196     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception JavaDoc {
197         sub.add(context, this);
198         destinationStatistics.getConsumers().increment();
199         maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
200
201         
202         
203         MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
204         try{
205             synchronized(consumers){
206                 if (sub.getConsumerInfo().isExclusive()) {
207                     // Add to front of list to ensure that an exclusive consumer gets all messages
208
// before non-exclusive consumers
209
consumers.add(0, sub);
210                 } else {
211                     consumers.add(sub);
212                 }
213             }
214             // page in messages
215
doPageIn();
216             // synchronize with dispatch method so that no new messages are sent
217
// while
218
// setting up a subscription. avoid out of order messages, duplicates
219
// etc.
220
dispatchValve.turnOff();
221             if (sub.getConsumerInfo().getPriority() > highestSubscriptionPriority) {
222                 highestSubscriptionPriority = sub.getConsumerInfo().getPriority();
223             }
224             msgContext.setDestination(destination);
225             synchronized(pagedInMessages){
226                 // Add all the matching messages in the queue to the
227
// subscription.
228
for(Iterator JavaDoc i=pagedInMessages.iterator();i.hasNext();){
229                     QueueMessageReference node=(QueueMessageReference)i.next();
230                     if(node.isDropped()){
231                         continue;
232                     }
233                     try{
234                         msgContext.setMessageReference(node);
235                         if(sub.matches(node,msgContext)){
236                             sub.add(node);
237                         }
238                     }catch(IOException JavaDoc e){
239                         log.warn("Could not load message: "+e,e);
240                     }
241                 }
242             }
243         }finally{
244             msgContext.clear();
245             dispatchValve.turnOn();
246         }
247     }
248
249     public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception JavaDoc {
250
251         destinationStatistics.getConsumers().decrement();
252         maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
253
254         // synchronize with dispatch method so that no new messages are sent
255
// while
256
// removing up a subscription.
257
dispatchValve.turnOff();
258
259         try {
260
261             synchronized (consumers) {
262                 consumers.remove(sub);
263                 if (consumers.isEmpty()) {
264                     messages.gc();
265                 }
266             }
267             sub.remove(context, this);
268
269             highestSubscriptionPriority = calcHighestSubscriptionPriority();
270
271             boolean wasExclusiveOwner = false;
272             if (exclusiveOwner == sub) {
273                 exclusiveOwner = null;
274                 wasExclusiveOwner = true;
275             }
276
277             ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
278             MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId);
279
280             if (!sub.getConsumerInfo().isBrowser()) {
281                 MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
282                 try {
283                     msgContext.setDestination(destination);
284
285                     // lets copy the messages to dispatch to avoid deadlock
286
List JavaDoc messagesToDispatch = new ArrayList JavaDoc();
287                     synchronized (pagedInMessages) {
288                         for(Iterator JavaDoc i = pagedInMessages.iterator();i.hasNext();) {
289                             QueueMessageReference node = (QueueMessageReference) i.next();
290                             if (node.isDropped()) {
291                                 continue;
292                             }
293
294                             String JavaDoc groupID = node.getGroupID();
295
296                             // Re-deliver all messages that the sub locked
297
if (node.getLockOwner() == sub || wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) {
298                                 messagesToDispatch.add(node);
299                             }
300                         }
301                     }
302
303                     // now lets dispatch from the copy of the collection to
304
// avoid deadlocks
305
for (Iterator JavaDoc iter = messagesToDispatch.iterator(); iter.hasNext();) {
306                         QueueMessageReference node = (QueueMessageReference) iter.next();
307                         node.incrementRedeliveryCounter();
308                         node.unlock();
309                         msgContext.setMessageReference(node);
310                         dispatchPolicy.dispatch(node, msgContext, consumers);
311                     }
312                 }
313                 finally {
314                     msgContext.clear();
315                 }
316             }
317         }
318         finally {
319             dispatchValve.turnOn();
320         }
321
322     }
323     
324     private final LinkedList JavaDoc<Runnable JavaDoc> messagesWaitingForSpace = new LinkedList JavaDoc<Runnable JavaDoc>();
325     private final Runnable JavaDoc sendMessagesWaitingForSpaceTask = new Runnable JavaDoc() {
326         public void run() {
327             
328             // We may need to do this in async thread since this is run for within a synchronization
329
// that the UsageManager is holding.
330

331             synchronized( messagesWaitingForSpace ) {
332                 while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
333                     Runnable JavaDoc op = messagesWaitingForSpace.removeFirst();
334                     op.run();
335                 }
336             }
337             
338         };
339     };
340
341     public void send(final ProducerBrokerExchange producerExchange,final Message message) throws Exception JavaDoc {
342         final ConnectionContext context = producerExchange.getConnectionContext();
343         // There is delay between the client sending it and it arriving at the
344
// destination.. it may have expired.
345
if(message.isExpired()){
346             if (log.isDebugEnabled()) {
347                 log.debug("Expired message: " + message);
348             }
349             if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
350                 ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
351                 context.getConnection().dispatchAsync(ack);
352             }
353             return;
354         }
355         if ( context.isProducerFlowControl() && usageManager.isFull() ) {
356             if(usageManager.isSendFailIfNoSpace()){
357                 throw new javax.jms.ResourceAllocationException JavaDoc("Usage Manager memory limit reached");
358             }
359                 
360             // We can avoid blocking due to low usage if the producer is sending a sync message or
361
// if it is using a producer window
362
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) {
363                 synchronized( messagesWaitingForSpace ) {
364                     messagesWaitingForSpace.add(new Runnable JavaDoc() {
365                         public void run() {
366                             
367                             // While waiting for space to free up... the message may have expired.
368
if(message.isExpired()){
369                                 if (log.isDebugEnabled()) {
370                                     log.debug("Expired message: " + message);
371                                 }
372                                 
373                                 if( !message.isResponseRequired() ) {
374                                     ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
375                                     context.getConnection().dispatchAsync(ack);
376                                 }
377                                 return;
378                             }
379                             
380                             
381                             try {
382                                 doMessageSend(producerExchange, message);
383                             } catch (Exception JavaDoc e) {
384                                 if( message.isResponseRequired() ) {
385                                     ExceptionResponse response = new ExceptionResponse(e);
386                                     response.setCorrelationId(message.getCommandId());
387                                     context.getConnection().dispatchAsync(response);
388                                 }
389                             }
390                         }
391                     });
392                     
393                     // If the user manager is not full, then the task will not get called..
394
if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
395                         // so call it directly here.
396
sendMessagesWaitingForSpaceTask.run();
397                     }
398                     context.setDontSendReponse(true);
399                     return;
400                 }
401                 
402             } else {
403                 
404                 // Producer flow control cannot be used, so we have do the flow control at the broker
405
// by blocking this thread until there is space available.
406
while( !usageManager.waitForSpace(1000) ) {
407                     if( context.getStopping().get() )
408                         throw new IOException JavaDoc("Connection closed, send aborted.");
409                 }
410                 
411                 // The usage manager could have delayed us by the time
412
// we unblock the message could have expired..
413
if(message.isExpired()){
414                     if (log.isDebugEnabled()) {
415                         log.debug("Expired message: " + message);
416                     }
417                     return;
418                 }
419             }
420         }
421         doMessageSend(producerExchange, message);
422     }
423
424     private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException JavaDoc, Exception JavaDoc {
425         final ConnectionContext context = producerExchange.getConnectionContext();
426         message.setRegionDestination(this);
427         if(store!=null&&message.isPersistent()){
428             store.addMessage(context,message);
429         }
430         if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
431             ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
432             context.getConnection().dispatchAsync(ack);
433         }
434         if(context.isInTransaction()){
435             // If this is a transacted message.. increase the usage now so that a big TX does not blow up
436
// our memory. This increment is decremented once the tx finishes..
437
message.incrementReferenceCount();
438             context.getTransaction().addSynchronization(new Synchronization(){
439                 public void afterCommit() throws Exception JavaDoc{
440                     try {
441                         // It could take while before we receive the commit
442
// op, by that time the message could have expired..
443
if(message.isExpired()){
444                             // TODO: remove message from store.
445
if (log.isDebugEnabled()) {
446                                 log.debug("Expired message: " + message);
447                             }
448                             return;
449                         }
450                         sendMessage(context,message);
451                     } finally {
452                         message.decrementReferenceCount();
453                     }
454                 }
455                 
456                 @Override JavaDoc
457                 public void afterRollback() throws Exception JavaDoc {
458                     message.decrementReferenceCount();
459                 }
460             });
461         }else{
462             // Add to the pending list, this takes care of incrementing the usage manager.
463
sendMessage(context,message);
464         }
465     }
466
467     public void dispose(ConnectionContext context) throws IOException JavaDoc {
468         if (store != null) {
469             store.removeAllMessages(context);
470         }
471         destinationStatistics.setParent(null);
472     }
473
474     public void dropEvent() {
475         dropEvent(false);
476     }
477
478     public void dropEvent(boolean skipGc){
479         // TODO: need to also decrement when messages expire.
480
destinationStatistics.getMessages().decrement();
481         synchronized(pagedInMessages){
482             garbageSize++;
483         }
484         if(!skipGc&&garbageSize>garbageSizeBeforeCollection){
485             gc();
486         }
487         try{
488             taskRunner.wakeup();
489         }catch(InterruptedException JavaDoc e){
490             log.warn("Task Runner failed to wakeup ",e);
491         }
492     }
493
494     public void gc() {
495         synchronized (pagedInMessages) {
496             for(Iterator JavaDoc i = pagedInMessages.iterator(); i.hasNext();) {
497                 // Remove dropped messages from the queue.
498
QueueMessageReference node = (QueueMessageReference) i.next();
499                 if (node.isDropped()) {
500                     garbageSize--;
501                     i.remove();
502                     continue;
503                 }
504             }
505         }
506     }
507
508     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException JavaDoc {
509         if (store != null && node.isPersistent()) {
510             // the original ack may be a ranged ack, but we are trying to delete
511
// a specific
512
// message store here so we need to convert to a non ranged ack.
513
if (ack.getMessageCount() > 0) {
514                 // Dup the ack
515
MessageAck a = new MessageAck();
516                 ack.copy(a);
517                 ack = a;
518                 // Convert to non-ranged.
519
ack.setFirstMessageId(node.getMessageId());
520                 ack.setLastMessageId(node.getMessageId());
521                 ack.setMessageCount(1);
522             }
523             store.removeMessage(context, ack);
524         }
525     }
526
527     Message loadMessage(MessageId messageId) throws IOException JavaDoc {
528         Message msg = store.getMessage(messageId);
529         if (msg != null) {
530             msg.setRegionDestination(this);
531         }
532         return msg;
533     }
534
535     public String JavaDoc toString() {
536         int size = 0;
537         synchronized (messages) {
538             size = messages.size();
539         }
540         return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + usageManager.getPercentUsage()
541                 + "%, size=" + size + ", in flight groups=" + messageGroupOwners;
542     }
543
544     public void start() throws Exception JavaDoc {
545         started = true;
546         if (usageManager != null) {
547             usageManager.start();
548         }
549         messages.start();
550         doPageIn(false);
551     }
552
553     public void stop() throws Exception JavaDoc {
554         started = false;
555         if( taskRunner!=null ) {
556             taskRunner.shutdown();
557         }
558         if(messages!=null){
559             messages.stop();
560         }
561         if (usageManager != null) {
562             usageManager.stop();
563         }
564     }
565
566     // Properties
567
// -------------------------------------------------------------------------
568
public ActiveMQDestination getActiveMQDestination() {
569         return destination;
570     }
571
572     public String JavaDoc getDestination() {
573         return destination.getPhysicalName();
574     }
575
576     public UsageManager getUsageManager() {
577         return usageManager;
578     }
579
580     public DestinationStatistics getDestinationStatistics() {
581         return destinationStatistics;
582     }
583
584     public MessageGroupMap getMessageGroupOwners() {
585         if (messageGroupOwners == null) {
586             messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
587         }
588         return messageGroupOwners;
589     }
590
591     public DispatchPolicy getDispatchPolicy() {
592         return dispatchPolicy;
593     }
594
595     public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
596         this.dispatchPolicy = dispatchPolicy;
597     }
598
599     public DeadLetterStrategy getDeadLetterStrategy() {
600         return deadLetterStrategy;
601     }
602
603     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
604         this.deadLetterStrategy = deadLetterStrategy;
605     }
606
607     public MessageGroupMapFactory getMessageGroupMapFactory() {
608         return messageGroupMapFactory;
609     }
610
611     public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
612         this.messageGroupMapFactory = messageGroupMapFactory;
613     }
614
615     public String JavaDoc getName() {
616         return getActiveMQDestination().getPhysicalName();
617     }
618
619     public PendingMessageCursor getMessages(){
620         return this.messages;
621     }
622     public void setMessages(PendingMessageCursor messages){
623         this.messages=messages;
624     }
625
626     // Implementation methods
627
// -------------------------------------------------------------------------
628
private MessageReference createMessageReference(Message message) {
629         MessageReference result = new IndirectMessageReference(this, store, message);
630         result.decrementReferenceCount();
631         return result;
632     }
633
634     
635     private int calcHighestSubscriptionPriority() {
636         int rc = Integer.MIN_VALUE;
637         synchronized (consumers) {
638             for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
639                 Subscription sub = (Subscription) iter.next();
640                 if (sub.getConsumerInfo().getPriority() > rc) {
641                     rc = sub.getConsumerInfo().getPriority();
642                 }
643             }
644         }
645         return rc;
646     }
647
648     public MessageStore getMessageStore() {
649         return store;
650     }
651
652     public Message[] browse() {
653         ArrayList JavaDoc l = new ArrayList JavaDoc();
654         try{
655             doPageIn(true);
656         }catch(Exception JavaDoc e){
657             log.error("caught an exception browsing " + this,e);
658         }
659         synchronized(pagedInMessages) {
660             for (Iterator JavaDoc i = pagedInMessages.iterator();i.hasNext();) {
661                 MessageReference r = (MessageReference)i.next();
662                 r.incrementReferenceCount();
663                 try {
664                     Message m = r.getMessage();
665                     if (m != null) {
666                         l.add(m);
667                     }
668                 }catch(IOException JavaDoc e){
669                     log.error("caught an exception browsing " + this,e);
670                 }
671                 finally {
672                     r.decrementReferenceCount();
673                 }
674             }
675         }
676         synchronized(messages){
677             try{
678                 messages.reset();
679                 while(messages.hasNext()){
680                     try{
681                         MessageReference r=messages.next();
682                         r.incrementReferenceCount();
683                         try{
684                             Message m=r.getMessage();
685                             if(m!=null){
686                                 l.add(m);
687                             }
688                         }finally{
689                             r.decrementReferenceCount();
690                         }
691                     }catch(IOException JavaDoc e){
692                         log.error("caught an exception brwsing "+this,e);
693                     }
694                 }
695             }finally{
696                 messages.release();
697             }
698         }
699
700         return (Message[]) l.toArray(new Message[l.size()]);
701     }
702
703     public Message getMessage(String JavaDoc messageId){
704         synchronized(messages){
705             try{
706                 messages.reset();
707                 while(messages.hasNext()){
708                     try{
709                         MessageReference r=messages.next();
710                         if(messageId.equals(r.getMessageId().toString())){
711                             r.incrementReferenceCount();
712                             try{
713                                 Message m=r.getMessage();
714                                 if(m!=null){
715                                     return m;
716                                 }
717                             }finally{
718                                 r.decrementReferenceCount();
719                             }
720                             break;
721                         }
722                     }catch(IOException JavaDoc e){
723                         log.error("got an exception retrieving message "+messageId);
724                     }
725                 }
726             }finally{
727                 messages.release();
728             }
729         }
730         return null;
731     }
732
733     public void purge() throws Exception JavaDoc {
734         
735         pageInMessages();
736         
737         synchronized (pagedInMessages) {
738             ConnectionContext c = createConnectionContext();
739             for(Iterator JavaDoc i = pagedInMessages.iterator(); i.hasNext();){
740                 try {
741                     QueueMessageReference r = (QueueMessageReference) i.next();
742
743                     // We should only delete messages that can be locked.
744
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
745                         MessageAck ack = new MessageAck();
746                         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
747                         ack.setDestination(destination);
748                         ack.setMessageID(r.getMessageId());
749                         acknowledge(c, null, ack, r);
750                         r.drop();
751                         dropEvent(true);
752                     }
753                 }
754                 catch (IOException JavaDoc e) {
755                 }
756             }
757
758             // Run gc() by hand. Had we run it in the loop it could be
759
// quite expensive.
760
gc();
761         }
762     }
763     
764
765     /**
766      * Removes the message matching the given messageId
767      */

768     public boolean removeMessage(String JavaDoc messageId) throws Exception JavaDoc {
769         return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
770     }
771
772     /**
773      * Removes the messages matching the given selector
774      *
775      * @return the number of messages removed
776      */

777     public int removeMatchingMessages(String JavaDoc selector) throws Exception JavaDoc {
778         return removeMatchingMessages(selector, -1);
779     }
780     
781     /**
782      * Removes the messages matching the given selector up to the maximum number of matched messages
783      *
784      * @return the number of messages removed
785      */

786     public int removeMatchingMessages(String JavaDoc selector, int maximumMessages) throws Exception JavaDoc {
787         return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
788     }
789
790     /**
791      * Removes the messages matching the given filter up to the maximum number of matched messages
792      *
793      * @return the number of messages removed
794      */

795     public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception JavaDoc {
796         pageInMessages();
797         int counter = 0;
798         synchronized (pagedInMessages) {
799             ConnectionContext c = createConnectionContext();
800            for(Iterator JavaDoc i = pagedInMessages.iterator(); i.hasNext();) {
801                IndirectMessageReference r = (IndirectMessageReference) i.next();
802                 if (filter.evaluate(c, r)) {
803                     removeMessage(c, r);
804                     if (++counter >= maximumMessages && maximumMessages > 0) {
805                         break;
806                     }
807                     
808                 }
809             }
810         }
811         return counter;
812     }
813
814     /**
815      * Copies the message matching the given messageId
816      */

817     public boolean copyMessageTo(ConnectionContext context, String JavaDoc messageId, ActiveMQDestination dest) throws Exception JavaDoc {
818         return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
819     }
820     
821     /**
822      * Copies the messages matching the given selector
823      *
824      * @return the number of messages copied
825      */

826     public int copyMatchingMessagesTo(ConnectionContext context, String JavaDoc selector, ActiveMQDestination dest) throws Exception JavaDoc {
827         return copyMatchingMessagesTo(context, selector, dest, -1);
828     }
829     
830     /**
831      * Copies the messages matching the given selector up to the maximum number of matched messages
832      *
833      * @return the number of messages copied
834      */

835     public int copyMatchingMessagesTo(ConnectionContext context, String JavaDoc selector, ActiveMQDestination dest, int maximumMessages) throws Exception JavaDoc {
836         return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
837     }
838
839     /**
840      * Copies the messages matching the given filter up to the maximum number of matched messages
841      *
842      * @return the number of messages copied
843      */

844     public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception JavaDoc {
845         pageInMessages();
846         int counter = 0;
847         synchronized (pagedInMessages) {
848             for(Iterator JavaDoc i = pagedInMessages.iterator(); i.hasNext();) {
849                 MessageReference r = (MessageReference) i.next();
850                 if (filter.evaluate(context, r)) {
851                     r.incrementReferenceCount();
852                     try {
853                         Message m = r.getMessage();
854                         BrokerSupport.resend(context, m, dest);
855                         if (++counter >= maximumMessages && maximumMessages > 0) {
856                             break;
857                         }
858                     }
859                     finally {
860                         r.decrementReferenceCount();
861                     }
862                 }
863             }
864         }
865         return counter;
866     }
867
868     /**
869      * Moves the message matching the given messageId
870      */

871     public boolean moveMessageTo(ConnectionContext context, String JavaDoc messageId, ActiveMQDestination dest) throws Exception JavaDoc {
872         return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
873     }
874     
875     /**
876      * Moves the messages matching the given selector
877      *
878      * @return the number of messages removed
879      */

880     public int moveMatchingMessagesTo(ConnectionContext context, String JavaDoc selector, ActiveMQDestination dest) throws Exception JavaDoc {
881         return moveMatchingMessagesTo(context, selector, dest, -1);
882     }
883     
884     /**
885      * Moves the messages matching the given selector up to the maximum number of matched messages
886      */

887     public int moveMatchingMessagesTo(ConnectionContext context, String JavaDoc selector, ActiveMQDestination dest, int maximumMessages) throws Exception JavaDoc {
888         return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
889     }
890
891     /**
892      * Moves the messages matching the given filter up to the maximum number of matched messages
893      */

894     public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception JavaDoc {
895         pageInMessages();
896         int counter = 0;
897         synchronized (pagedInMessages) {
898             for(Iterator JavaDoc i = pagedInMessages.iterator(); i.hasNext();) {
899                 IndirectMessageReference r = (IndirectMessageReference) i.next();
900                 if (filter.evaluate(context, r)) {
901                     // We should only move messages that can be locked.
902
if (lockMessage(r)) {
903                         r.incrementReferenceCount();
904                         try {
905                             Message m = r.getMessage();
906                             BrokerSupport.resend(context, m, dest);
907                             removeMessage(context, r);
908                             if (++counter >= maximumMessages && maximumMessages > 0) {
909                                 break;
910                             }
911                         }
912                         finally {
913                             r.decrementReferenceCount();
914                         }
915                     }
916                 }
917             }
918         }
919         return counter;
920     }
921     
922     /**
923      * @return
924      * @see org.apache.activemq.thread.Task#iterate()
925      */

926     public boolean iterate(){
927         try{
928             pageInMessages(false);
929          }catch(Exception JavaDoc e){
930              log.error("Failed to page in more queue messages ",e);
931          }
932         return false;
933     }
934
935     protected MessageReferenceFilter createMessageIdFilter(final String JavaDoc messageId) {
936         return new MessageReferenceFilter() {
937             public boolean evaluate(ConnectionContext context, MessageReference r) {
938                 return messageId.equals(r.getMessageId().toString());
939             }
940         };
941     }
942     
943     protected MessageReferenceFilter createSelectorFilter(String JavaDoc selector) throws InvalidSelectorException JavaDoc {
944         final BooleanExpression selectorExpression = new SelectorParser().parse(selector);
945
946         return new MessageReferenceFilter() {
947             public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException JavaDoc {
948                 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
949                 
950                 messageEvaluationContext.setMessageReference(r);
951                 if (messageEvaluationContext.getDestination() == null) {
952                     messageEvaluationContext.setDestination(getActiveMQDestination());
953                 }
954                 
955                 return selectorExpression.matches(messageEvaluationContext);
956             }
957         };
958     }
959
960         
961     protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws IOException JavaDoc {
962         MessageAck ack = new MessageAck();
963         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
964         ack.setDestination(destination);
965         ack.setMessageID(r.getMessageId());
966         acknowledge(c, null, ack, r);
967         r.drop();
968         dropEvent();
969     }
970
971     protected boolean lockMessage(IndirectMessageReference r) {
972         return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER);
973     }
974
975     protected ConnectionContext createConnectionContext() {
976         ConnectionContext answer = new ConnectionContext();
977         answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
978         return answer;
979     }
980     
981       
982     private void sendMessage(final ConnectionContext context,Message msg) throws Exception JavaDoc{
983         synchronized(messages){
984             messages.addMessageLast(msg);
985         }
986         destinationStatistics.getEnqueues().increment();
987         destinationStatistics.getMessages().increment();
988         pageInMessages(false);
989     }
990     
991     private List JavaDoc doPageIn() throws Exception JavaDoc{
992         return doPageIn(true);
993     }
994     
995     private List JavaDoc doPageIn(boolean force) throws Exception JavaDoc{
996         final int toPageIn=maximumPagedInMessages-pagedInMessages.size();
997         List JavaDoc result=null;
998         if((force||!consumers.isEmpty())&&toPageIn>0){
999             messages.setMaxBatchSize(toPageIn);
1000            try{
1001                dispatchValve.increment();
1002                int count=0;
1003                result=new ArrayList JavaDoc(toPageIn);
1004                synchronized(messages){
1005                    try{
1006                        messages.reset();
1007                        while(messages.hasNext()&&count<toPageIn){
1008                            MessageReference node=messages.next();
1009                            messages.remove();
1010                            if(!node.isExpired()){
1011                                node=createMessageReference(node.getMessage());
1012                                result.add(node);
1013                                count++;
1014                            }else{
1015                                if (log.isDebugEnabled()) {
1016                                    log.debug("Expired message: " + node);
1017                                }
1018                            }
1019                        }
1020                    }finally{
1021                        messages.release();
1022                    }
1023                }
1024                synchronized(pagedInMessages){
1025                    pagedInMessages.addAll(result);
1026                }
1027            }finally{
1028                queueMsgConext.clear();
1029                dispatchValve.decrement();
1030            }
1031        }
1032        return result;
1033    }
1034
1035    private void doDispatch(List JavaDoc list) throws Exception JavaDoc{
1036        if(list!=null&&!list.isEmpty()){
1037            try{
1038                dispatchValve.increment();
1039                for(int i=0;i<list.size();i++){
1040                    MessageReference node=(MessageReference)list.get(i);
1041                    queueMsgConext.setDestination(destination);
1042                    queueMsgConext.setMessageReference(node);
1043                    dispatchPolicy.dispatch(node,queueMsgConext,consumers);
1044                }
1045            }finally{
1046                queueMsgConext.clear();
1047                dispatchValve.decrement();
1048            }
1049        }
1050    }
1051    
1052    private void pageInMessages() throws Exception JavaDoc{
1053        pageInMessages(true);
1054    }
1055    private void pageInMessages(boolean force) throws Exception JavaDoc{
1056        synchronized(doDispatchMutex) {
1057            doDispatch(doPageIn(force));
1058        }
1059    }
1060
1061    
1062}
1063
Popular Tags