KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > Topic


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import java.io.IOException JavaDoc;
21 import java.util.LinkedList JavaDoc;
22 import java.util.Set JavaDoc;
23 import java.util.concurrent.ConcurrentHashMap JavaDoc;
24 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
25 import java.util.concurrent.CopyOnWriteArraySet JavaDoc;
26 import org.apache.activemq.advisory.AdvisorySupport;
27 import org.apache.activemq.broker.ConnectionContext;
28 import org.apache.activemq.broker.ProducerBrokerExchange;
29 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
30 import org.apache.activemq.broker.region.policy.DispatchPolicy;
31 import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
32 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
33 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
34 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
35 import org.apache.activemq.command.ActiveMQDestination;
36 import org.apache.activemq.command.ActiveMQTopic;
37 import org.apache.activemq.command.ExceptionResponse;
38 import org.apache.activemq.command.Message;
39 import org.apache.activemq.command.MessageAck;
40 import org.apache.activemq.command.MessageId;
41 import org.apache.activemq.command.ProducerAck;
42 import org.apache.activemq.command.SubscriptionInfo;
43 import org.apache.activemq.filter.MessageEvaluationContext;
44 import org.apache.activemq.memory.UsageManager;
45 import org.apache.activemq.store.MessageRecoveryListener;
46 import org.apache.activemq.store.MessageStore;
47 import org.apache.activemq.store.TopicMessageStore;
48 import org.apache.activemq.thread.TaskRunnerFactory;
49 import org.apache.activemq.thread.Valve;
50 import org.apache.activemq.transaction.Synchronization;
51 import org.apache.activemq.util.SubscriptionKey;
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54
55 /**
56  * The Topic is a destination that sends a copy of a message to every active
57  * Subscription registered.
58  *
59  * @version $Revision: 1.21 $
60  */

61 public class Topic implements Destination {
62     private static final Log log = LogFactory.getLog(Topic.class);
63     protected final ActiveMQDestination destination;
64     protected final CopyOnWriteArrayList JavaDoc consumers = new CopyOnWriteArrayList JavaDoc();
65     protected final Valve dispatchValve = new Valve(true);
66     protected final TopicMessageStore store;//this could be NULL! (If an advsiory)
67
protected final UsageManager usageManager;
68     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
69
70     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
71     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedSizedSubscriptionRecoveryPolicy();
72     private boolean sendAdvisoryIfNoConsumers;
73     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
74     private final ConcurrentHashMap JavaDoc durableSubcribers = new ConcurrentHashMap JavaDoc();
75     
76     public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
77             TaskRunnerFactory taskFactory) {
78
79         this.destination = destination;
80         this.store = store; //this could be NULL! (If an advsiory)
81
this.usageManager = new UsageManager(memoryManager,destination.toString());
82         this.usageManager.setUsagePortion(1.0f);
83         
84         // Let the store know what usage manager we are using so that he can flush messages to disk
85
// when usage gets high.
86
if( store!=null ) {
87             store.setUsageManager(usageManager);
88         }
89         
90         //let's copy the enabled property from the parent DestinationStatistics
91
this.destinationStatistics.setEnabled(parentStats.isEnabled());
92         this.destinationStatistics.setParent(parentStats);
93     }
94
95     public boolean lock(MessageReference node, LockOwner sub) {
96         return true;
97     }
98
99     public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception JavaDoc {
100         
101         sub.add(context, this);
102         destinationStatistics.getConsumers().increment();
103
104         if ( !sub.getConsumerInfo().isDurable() ) {
105
106             // Do a retroactive recovery if needed.
107
if (sub.getConsumerInfo().isRetroactive()) {
108                 
109                 // synchronize with dispatch method so that no new messages are sent
110
// while we are recovering a subscription to avoid out of order messages.
111
dispatchValve.turnOff();
112                 try {
113                     
114                     synchronized(consumers) {
115                         consumers.add(sub);
116                     }
117                     subscriptionRecoveryPolicy.recover(context, this, sub);
118                     
119                 } finally {
120                     dispatchValve.turnOn();
121                 }
122                 
123             } else {
124                 synchronized(consumers) {
125                     consumers.add(sub);
126                 }
127             }
128         } else {
129             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
130             durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
131         }
132     }
133     
134     public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception JavaDoc {
135         if ( !sub.getConsumerInfo().isDurable() ) {
136             destinationStatistics.getConsumers().decrement();
137             synchronized(consumers) {
138                 consumers.remove(sub);
139             }
140         }
141         sub.remove(context, this);
142     }
143        
144     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException JavaDoc {
145         if (store != null) {
146             store.deleteSubscription(key.clientId, key.subscriptionName);
147             Object JavaDoc removed = durableSubcribers.remove(key);
148             if(removed != null) {
149                 destinationStatistics.getConsumers().decrement();
150             }
151         }
152     }
153     
154     public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception JavaDoc {
155         // synchronize with dispatch method so that no new messages are sent
156
// while
157
// we are recovering a subscription to avoid out of order messages.
158
dispatchValve.turnOff();
159         try {
160         
161             synchronized(consumers) {
162                 consumers.add(subscription);
163             }
164             
165             if (store == null )
166                 return;
167             
168             // Recover the durable subscription.
169
String JavaDoc clientId = subscription.getClientId();
170             String JavaDoc subscriptionName = subscription.getSubscriptionName();
171             String JavaDoc selector = subscription.getConsumerInfo().getSelector();
172             SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName);
173             if (info != null) {
174                 // Check to see if selector changed.
175
String JavaDoc s1 = info.getSelector();
176                 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
177                     // Need to delete the subscription
178
store.deleteSubscription(clientId, subscriptionName);
179                     info = null;
180                 }
181             }
182             // Do we need to create the subscription?
183
if (info == null) {
184                 store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive());
185             }
186     
187             final MessageEvaluationContext msgContext = new MessageEvaluationContext();
188             msgContext.setDestination(destination);
189             if(subscription.isRecoveryRequired()){
190                 store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){
191                     public void recoverMessage(Message message) throws Exception JavaDoc{
192                         message.setRegionDestination(Topic.this);
193                         try{
194                             msgContext.setMessageReference(message);
195                             if(subscription.matches(message,msgContext)){
196                                 subscription.add(message);
197                             }
198                         }catch(InterruptedException JavaDoc e){
199                             Thread.currentThread().interrupt();
200                         }catch(IOException JavaDoc e){
201                             // TODO: Need to handle this better.
202
e.printStackTrace();
203                         }
204                     }
205
206                     public void recoverMessageReference(MessageId messageReference) throws Exception JavaDoc{
207                         throw new RuntimeException JavaDoc("Should not be called.");
208                     }
209
210                     public void finished(){}
211
212                     public boolean hasSpace(){
213                         return true;
214                     }
215                 });
216             }
217             
218             
219         
220         }
221         finally {
222             dispatchValve.turnOn();
223         }
224     }
225
226     public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception JavaDoc {
227         synchronized(consumers) {
228             consumers.remove(sub);
229         }
230         sub.remove(context, this);
231     }
232     
233     
234     protected void recoverRetroactiveMessages(ConnectionContext context,Subscription subscription) throws Exception JavaDoc{
235         if(subscription.getConsumerInfo().isRetroactive()){
236             subscriptionRecoveryPolicy.recover(context,this,subscription);
237         }
238     }
239     
240
241     private final LinkedList JavaDoc<Runnable JavaDoc> messagesWaitingForSpace = new LinkedList JavaDoc<Runnable JavaDoc>();
242     private final Runnable JavaDoc sendMessagesWaitingForSpaceTask = new Runnable JavaDoc() {
243         public void run() {
244             
245             // We may need to do this in async thread since this is run for within a synchronization
246
// that the UsageManager is holding.
247

248             synchronized( messagesWaitingForSpace ) {
249                 while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
250                     Runnable JavaDoc op = messagesWaitingForSpace.removeFirst();
251                     op.run();
252                 }
253             }
254             
255         };
256     };
257
258     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception JavaDoc {
259         final ConnectionContext context = producerExchange.getConnectionContext();
260         
261         // There is delay between the client sending it and it arriving at the
262
// destination.. it may have expired.
263
if( message.isExpired() ) {
264             if (log.isDebugEnabled()) {
265                 log.debug("Expired message: " + message);
266             }
267             if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
268                 ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
269                 context.getConnection().dispatchAsync(ack);
270             }
271             return;
272         }
273         
274         if ( context.isProducerFlowControl() && usageManager.isFull() ) {
275             if(usageManager.isSendFailIfNoSpace()){
276                 throw new javax.jms.ResourceAllocationException JavaDoc("Usage Manager memory limit reached");
277             }
278                 
279             // We can avoid blocking due to low usage if the producer is sending a sync message or
280
// if it is using a producer window
281
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) {
282                 synchronized( messagesWaitingForSpace ) {
283                     messagesWaitingForSpace.add(new Runnable JavaDoc() {
284                         public void run() {
285                             
286                             // While waiting for space to free up... the message may have expired.
287
if(message.isExpired()){
288                                 if (log.isDebugEnabled()) {
289                                     log.debug("Expired message: " + message);
290                                 }
291                                 
292                                 if( !message.isResponseRequired() ) {
293                                     ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
294                                     context.getConnection().dispatchAsync(ack);
295                                 }
296                                 return;
297                             }
298                             
299                             
300                             try {
301                                 doMessageSend(producerExchange, message);
302                             } catch (Exception JavaDoc e) {
303                                 if( message.isResponseRequired() ) {
304                                     ExceptionResponse response = new ExceptionResponse(e);
305                                     response.setCorrelationId(message.getCommandId());
306                                     context.getConnection().dispatchAsync(response);
307                                 }
308                             }
309                         }
310                     });
311                     
312                     // If the user manager is not full, then the task will not get called..
313
if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
314                         // so call it directly here.
315
sendMessagesWaitingForSpaceTask.run();
316                     }
317                     context.setDontSendReponse(true);
318                     return;
319                 }
320                 
321             } else {
322                 
323                 // Producer flow control cannot be used, so we have do the flow control at the broker
324
// by blocking this thread until there is space available.
325
while( !usageManager.waitForSpace(1000) ) {
326                     if( context.getStopping().get() )
327                         throw new IOException JavaDoc("Connection closed, send aborted.");
328                 }
329                 
330                 // The usage manager could have delayed us by the time
331
// we unblock the message could have expired..
332
if(message.isExpired()){
333                     if (log.isDebugEnabled()) {
334                         log.debug("Expired message: " + message);
335                     }
336                     return;
337                 }
338             }
339         }
340
341         doMessageSend(producerExchange, message);
342     }
343
344     private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException JavaDoc, Exception JavaDoc {
345         final ConnectionContext context = producerExchange.getConnectionContext();
346         message.setRegionDestination(this);
347
348         if (store != null && message.isPersistent() && !canOptimizeOutPersistence() )
349             store.addMessage(context, message);
350
351         message.incrementReferenceCount();
352         try {
353
354             if (context.isInTransaction()) {
355                 context.getTransaction().addSynchronization(new Synchronization() {
356                     public void afterCommit() throws Exception JavaDoc {
357                         // It could take while before we receive the commit
358
// operration.. by that time the message could have expired..
359
if( message.isExpired() ) {
360                             // TODO: remove message from store.
361
return;
362                         }
363                         dispatch(context, message);
364                     }
365                 });
366
367             }
368             else {
369                 dispatch(context, message);
370             }
371
372         }
373         finally {
374             message.decrementReferenceCount();
375         }
376     }
377
378     private boolean canOptimizeOutPersistence() {
379         return durableSubcribers.size()==0;
380     }
381
382     public String JavaDoc toString() {
383         return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
384     }
385
386     public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException JavaDoc {
387         if (store != null && node.isPersistent()) {
388             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
389             store.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId());
390         }
391     }
392
393     public void dispose(ConnectionContext context) throws IOException JavaDoc {
394         if (store != null) {
395             store.removeAllMessages(context);
396         }
397         destinationStatistics.setParent(null);
398     }
399
400     public void gc() {
401     }
402
403     public Message loadMessage(MessageId messageId) throws IOException JavaDoc {
404         return store != null ? store.getMessage(messageId) : null;
405     }
406
407     public void start() throws Exception JavaDoc {
408         this.subscriptionRecoveryPolicy.start();
409         if (usageManager != null) {
410             usageManager.start();
411         }
412         
413     }
414
415     public void stop() throws Exception JavaDoc {
416         this.subscriptionRecoveryPolicy.stop();
417         if (usageManager != null) {
418             usageManager.stop();
419         }
420     }
421     
422     public Message[] browse(){
423         final Set JavaDoc result=new CopyOnWriteArraySet JavaDoc();
424         try{
425             if(store!=null){
426                 store.recover(new MessageRecoveryListener(){
427                     public void recoverMessage(Message message) throws Exception JavaDoc{
428                         result.add(message);
429                     }
430
431                     public void recoverMessageReference(MessageId messageReference) throws Exception JavaDoc{}
432
433                     public void finished(){}
434
435                     public boolean hasSpace(){
436                        return true;
437                     }
438                 });
439                 Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination());
440                 if(msgs!=null){
441                     for(int i=0;i<msgs.length;i++){
442                         result.add(msgs[i]);
443                     }
444                 }
445             }
446         }catch(Throwable JavaDoc e){
447             log.warn("Failed to browse Topic: "+getActiveMQDestination().getPhysicalName(),e);
448         }
449         return (Message[]) result.toArray(new Message[result.size()]);
450     }
451
452     // Properties
453
// -------------------------------------------------------------------------
454

455     public UsageManager getUsageManager() {
456         return usageManager;
457     }
458
459     public DestinationStatistics getDestinationStatistics() {
460         return destinationStatistics;
461     }
462
463     public ActiveMQDestination getActiveMQDestination() {
464         return destination;
465     }
466
467     public String JavaDoc getDestination() {
468         return destination.getPhysicalName();
469     }
470     
471     public DispatchPolicy getDispatchPolicy() {
472         return dispatchPolicy;
473     }
474
475     public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
476         this.dispatchPolicy = dispatchPolicy;
477     }
478
479     public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
480         return subscriptionRecoveryPolicy;
481     }
482
483     public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
484         this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
485     }
486
487     public boolean isSendAdvisoryIfNoConsumers() {
488         return sendAdvisoryIfNoConsumers;
489     }
490
491     public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
492         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
493     }
494
495     public MessageStore getMessageStore() {
496         return store;
497     }
498
499     public DeadLetterStrategy getDeadLetterStrategy() {
500         return deadLetterStrategy;
501     }
502
503     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
504         this.deadLetterStrategy = deadLetterStrategy;
505     }
506
507     public String JavaDoc getName() {
508         return getActiveMQDestination().getPhysicalName();
509     }
510
511
512     // Implementation methods
513
// -------------------------------------------------------------------------
514
protected void dispatch(final ConnectionContext context, Message message) throws Exception JavaDoc {
515         destinationStatistics.getEnqueues().increment();
516         dispatchValve.increment();
517         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
518         try {
519             if (!subscriptionRecoveryPolicy.add(context, message)) {
520                 return;
521             }
522             synchronized(consumers) {
523                 if (consumers.isEmpty()) {
524                     onMessageWithNoConsumers(context, message);
525                     return;
526                 }
527             }
528
529             msgContext.setDestination(destination);
530             msgContext.setMessageReference(message);
531
532             if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
533                 onMessageWithNoConsumers(context, message);
534             }
535         }
536         finally {
537             msgContext.clear();
538             dispatchValve.decrement();
539         }
540     }
541
542     /**
543      * Provides a hook to allow messages with no consumer to be processed in
544      * some way - such as to send to a dead letter queue or something..
545      */

546     protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Exception JavaDoc {
547         if (!message.isPersistent()) {
548             if (sendAdvisoryIfNoConsumers) {
549                 // allow messages with no consumers to be dispatched to a dead
550
// letter queue
551
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
552                     
553                     // The original destination and transaction id do not get filled when the message is first sent,
554
// it is only populated if the message is routed to another destination like the DLQ
555
if( message.getOriginalDestination()!=null )
556                         message.setOriginalDestination(message.getDestination());
557                     if( message.getOriginalTransactionId()!=null )
558                         message.setOriginalTransactionId(message.getTransactionId());
559
560                     ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
561                     message.setDestination(advisoryTopic);
562                     message.setTransactionId(null);
563
564                     // Disable flow control for this since since we don't want to block.
565
boolean originalFlowControl = context.isProducerFlowControl();
566                     try {
567                         context.setProducerFlowControl(false);
568                         ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
569                         producerExchange.setMutable(false);
570                         producerExchange.setConnectionContext(context);
571                         context.getBroker().send(producerExchange, message);
572                     } finally {
573                         context.setProducerFlowControl(originalFlowControl);
574                     }
575                     
576                 }
577             }
578         }
579     }
580
581
582 }
583
Popular Tags