|                                                                                                              1
 18  package org.apache.activemq.broker.region;
 19
 20  import java.io.IOException
  ; 21  import java.util.LinkedList
  ; 22  import java.util.Set
  ; 23  import java.util.concurrent.ConcurrentHashMap
  ; 24  import java.util.concurrent.CopyOnWriteArrayList
  ; 25  import java.util.concurrent.CopyOnWriteArraySet
  ; 26  import org.apache.activemq.advisory.AdvisorySupport;
 27  import org.apache.activemq.broker.ConnectionContext;
 28  import org.apache.activemq.broker.ProducerBrokerExchange;
 29  import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 30  import org.apache.activemq.broker.region.policy.DispatchPolicy;
 31  import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy;
 32  import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 33  import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 34  import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
 35  import org.apache.activemq.command.ActiveMQDestination;
 36  import org.apache.activemq.command.ActiveMQTopic;
 37  import org.apache.activemq.command.ExceptionResponse;
 38  import org.apache.activemq.command.Message;
 39  import org.apache.activemq.command.MessageAck;
 40  import org.apache.activemq.command.MessageId;
 41  import org.apache.activemq.command.ProducerAck;
 42  import org.apache.activemq.command.SubscriptionInfo;
 43  import org.apache.activemq.filter.MessageEvaluationContext;
 44  import org.apache.activemq.memory.UsageManager;
 45  import org.apache.activemq.store.MessageRecoveryListener;
 46  import org.apache.activemq.store.MessageStore;
 47  import org.apache.activemq.store.TopicMessageStore;
 48  import org.apache.activemq.thread.TaskRunnerFactory;
 49  import org.apache.activemq.thread.Valve;
 50  import org.apache.activemq.transaction.Synchronization;
 51  import org.apache.activemq.util.SubscriptionKey;
 52  import org.apache.commons.logging.Log;
 53  import org.apache.commons.logging.LogFactory;
 54
 55
 61  public class Topic implements Destination {
 62      private static final Log log = LogFactory.getLog(Topic.class);
 63      protected final ActiveMQDestination destination;
 64      protected final CopyOnWriteArrayList
  consumers = new CopyOnWriteArrayList  (); 65      protected final Valve dispatchValve = new Valve(true);
 66      protected final TopicMessageStore store;    protected final UsageManager usageManager;
 68      protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
 69
 70      private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
 71      private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedSizedSubscriptionRecoveryPolicy();
 72      private boolean sendAdvisoryIfNoConsumers;
 73      private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
 74      private final ConcurrentHashMap
  durableSubcribers = new ConcurrentHashMap  (); 75
 76      public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
 77              TaskRunnerFactory taskFactory) {
 78
 79          this.destination = destination;
 80          this.store = store;         this.usageManager = new UsageManager(memoryManager,destination.toString());
 82          this.usageManager.setUsagePortion(1.0f);
 83
 84                          if( store!=null ) {
 87              store.setUsageManager(usageManager);
 88          }
 89
 90                  this.destinationStatistics.setEnabled(parentStats.isEnabled());
 92          this.destinationStatistics.setParent(parentStats);
 93      }
 94
 95      public boolean lock(MessageReference node, LockOwner sub) {
 96          return true;
 97      }
 98
 99      public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception
  { 100
 101         sub.add(context, this);
 102         destinationStatistics.getConsumers().increment();
 103
 104         if ( !sub.getConsumerInfo().isDurable() ) {
 105
 106                         if (sub.getConsumerInfo().isRetroactive()) {
 108
 109                                                 dispatchValve.turnOff();
 112                 try {
 113
 114                     synchronized(consumers) {
 115                         consumers.add(sub);
 116                     }
 117                     subscriptionRecoveryPolicy.recover(context, this, sub);
 118
 119                 } finally {
 120                     dispatchValve.turnOn();
 121                 }
 122
 123             } else {
 124                 synchronized(consumers) {
 125                     consumers.add(sub);
 126                 }
 127             }
 128         } else {
 129             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
 130             durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
 131         }
 132     }
 133
 134     public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception
  { 135         if ( !sub.getConsumerInfo().isDurable() ) {
 136             destinationStatistics.getConsumers().decrement();
 137             synchronized(consumers) {
 138                 consumers.remove(sub);
 139             }
 140         }
 141         sub.remove(context, this);
 142     }
 143
 144     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException
  { 145         if (store != null) {
 146             store.deleteSubscription(key.clientId, key.subscriptionName);
 147             Object
  removed = durableSubcribers.remove(key); 148             if(removed != null) {
 149                 destinationStatistics.getConsumers().decrement();
 150             }
 151         }
 152     }
 153
 154     public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception
  { 155                                 dispatchValve.turnOff();
 159         try {
 160
 161             synchronized(consumers) {
 162                 consumers.add(subscription);
 163             }
 164
 165             if (store == null )
 166                 return;
 167
 168                         String
  clientId = subscription.getClientId(); 170             String
  subscriptionName = subscription.getSubscriptionName(); 171             String
  selector = subscription.getConsumerInfo().getSelector(); 172             SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName);
 173             if (info != null) {
 174                                 String
  s1 = info.getSelector(); 176                 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
 177                                         store.deleteSubscription(clientId, subscriptionName);
 179                     info = null;
 180                 }
 181             }
 182                         if (info == null) {
 184                 store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive());
 185             }
 186
 187             final MessageEvaluationContext msgContext = new MessageEvaluationContext();
 188             msgContext.setDestination(destination);
 189             if(subscription.isRecoveryRequired()){
 190                 store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){
 191                     public void recoverMessage(Message message) throws Exception
  { 192                         message.setRegionDestination(Topic.this);
 193                         try{
 194                             msgContext.setMessageReference(message);
 195                             if(subscription.matches(message,msgContext)){
 196                                 subscription.add(message);
 197                             }
 198                         }catch(InterruptedException
  e){ 199                             Thread.currentThread().interrupt();
 200                         }catch(IOException
  e){ 201                                                         e.printStackTrace();
 203                         }
 204                     }
 205
 206                     public void recoverMessageReference(MessageId messageReference) throws Exception
  { 207                         throw new RuntimeException
  ("Should not be called."); 208                     }
 209
 210                     public void finished(){}
 211
 212                     public boolean hasSpace(){
 213                         return true;
 214                     }
 215                 });
 216             }
 217
 218
 219
 220         }
 221         finally {
 222             dispatchValve.turnOn();
 223         }
 224     }
 225
 226     public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception
  { 227         synchronized(consumers) {
 228             consumers.remove(sub);
 229         }
 230         sub.remove(context, this);
 231     }
 232
 233
 234     protected void recoverRetroactiveMessages(ConnectionContext context,Subscription subscription) throws Exception
  { 235         if(subscription.getConsumerInfo().isRetroactive()){
 236             subscriptionRecoveryPolicy.recover(context,this,subscription);
 237         }
 238     }
 239
 240
 241     private final LinkedList
  <Runnable  > messagesWaitingForSpace = new LinkedList  <Runnable  >(); 242     private final Runnable
  sendMessagesWaitingForSpaceTask = new Runnable  () { 243         public void run() {
 244
 245
 248             synchronized( messagesWaitingForSpace ) {
 249                 while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
 250                     Runnable
  op = messagesWaitingForSpace.removeFirst(); 251                     op.run();
 252                 }
 253             }
 254
 255         };
 256     };
 257
 258     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception
  { 259         final ConnectionContext context = producerExchange.getConnectionContext();
 260
 261                         if( message.isExpired() ) {
 264             if (log.isDebugEnabled()) {
 265                 log.debug("Expired message: " + message);
 266             }
 267             if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
 268                 ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
 269                 context.getConnection().dispatchAsync(ack);
 270             }
 271             return;
 272         }
 273
 274         if ( context.isProducerFlowControl() && usageManager.isFull() ) {
 275             if(usageManager.isSendFailIfNoSpace()){
 276                 throw new javax.jms.ResourceAllocationException
  ("Usage Manager memory limit reached"); 277             }
 278
 279                                     if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) {
 282                 synchronized( messagesWaitingForSpace ) {
 283                     messagesWaitingForSpace.add(new Runnable
  () { 284                         public void run() {
 285
 286                                                         if(message.isExpired()){
 288                                 if (log.isDebugEnabled()) {
 289                                     log.debug("Expired message: " + message);
 290                                 }
 291
 292                                 if( !message.isResponseRequired() ) {
 293                                     ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
 294                                     context.getConnection().dispatchAsync(ack);
 295                                 }
 296                                 return;
 297                             }
 298
 299
 300                             try {
 301                                 doMessageSend(producerExchange, message);
 302                             } catch (Exception
  e) { 303                                 if( message.isResponseRequired() ) {
 304                                     ExceptionResponse response = new ExceptionResponse(e);
 305                                     response.setCorrelationId(message.getCommandId());
 306                                     context.getConnection().dispatchAsync(response);
 307                                 }
 308                             }
 309                         }
 310                     });
 311
 312                                         if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) {
 314                                                 sendMessagesWaitingForSpaceTask.run();
 316                     }
 317                     context.setDontSendReponse(true);
 318                     return;
 319                 }
 320
 321             } else {
 322
 323                                                 while( !usageManager.waitForSpace(1000) ) {
 326                     if( context.getStopping().get() )
 327                         throw new IOException
  ("Connection closed, send aborted."); 328                 }
 329
 330                                                 if(message.isExpired()){
 333                     if (log.isDebugEnabled()) {
 334                         log.debug("Expired message: " + message);
 335                     }
 336                     return;
 337                 }
 338             }
 339         }
 340
 341         doMessageSend(producerExchange, message);
 342     }
 343
 344     private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException
  , Exception  { 345         final ConnectionContext context = producerExchange.getConnectionContext();
 346         message.setRegionDestination(this);
 347
 348         if (store != null && message.isPersistent() && !canOptimizeOutPersistence() )
 349             store.addMessage(context, message);
 350
 351         message.incrementReferenceCount();
 352         try {
 353
 354             if (context.isInTransaction()) {
 355                 context.getTransaction().addSynchronization(new Synchronization() {
 356                     public void afterCommit() throws Exception
  { 357                                                                         if( message.isExpired() ) {
 360                                                         return;
 362                         }
 363                         dispatch(context, message);
 364                     }
 365                 });
 366
 367             }
 368             else {
 369                 dispatch(context, message);
 370             }
 371
 372         }
 373         finally {
 374             message.decrementReferenceCount();
 375         }
 376     }
 377
 378     private boolean canOptimizeOutPersistence() {
 379         return durableSubcribers.size()==0;
 380     }
 381
 382     public String
  toString() { 383         return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
 384     }
 385
 386     public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException
  { 387         if (store != null && node.isPersistent()) {
 388             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
 389             store.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId());
 390         }
 391     }
 392
 393     public void dispose(ConnectionContext context) throws IOException
  { 394         if (store != null) {
 395             store.removeAllMessages(context);
 396         }
 397         destinationStatistics.setParent(null);
 398     }
 399
 400     public void gc() {
 401     }
 402
 403     public Message loadMessage(MessageId messageId) throws IOException
  { 404         return store != null ? store.getMessage(messageId) : null;
 405     }
 406
 407     public void start() throws Exception
  { 408         this.subscriptionRecoveryPolicy.start();
 409         if (usageManager != null) {
 410             usageManager.start();
 411         }
 412
 413     }
 414
 415     public void stop() throws Exception
  { 416         this.subscriptionRecoveryPolicy.stop();
 417         if (usageManager != null) {
 418             usageManager.stop();
 419         }
 420     }
 421
 422     public Message[] browse(){
 423         final Set
  result=new CopyOnWriteArraySet  (); 424         try{
 425             if(store!=null){
 426                 store.recover(new MessageRecoveryListener(){
 427                     public void recoverMessage(Message message) throws Exception
  { 428                         result.add(message);
 429                     }
 430
 431                     public void recoverMessageReference(MessageId messageReference) throws Exception
  {} 432
 433                     public void finished(){}
 434
 435                     public boolean hasSpace(){
 436                        return true;
 437                     }
 438                 });
 439                 Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination());
 440                 if(msgs!=null){
 441                     for(int i=0;i<msgs.length;i++){
 442                         result.add(msgs[i]);
 443                     }
 444                 }
 445             }
 446         }catch(Throwable
  e){ 447             log.warn("Failed to browse Topic: "+getActiveMQDestination().getPhysicalName(),e);
 448         }
 449         return (Message[]) result.toArray(new Message[result.size()]);
 450     }
 451
 452
 455     public UsageManager getUsageManager() {
 456         return usageManager;
 457     }
 458
 459     public DestinationStatistics getDestinationStatistics() {
 460         return destinationStatistics;
 461     }
 462
 463     public ActiveMQDestination getActiveMQDestination() {
 464         return destination;
 465     }
 466
 467     public String
  getDestination() { 468         return destination.getPhysicalName();
 469     }
 470
 471     public DispatchPolicy getDispatchPolicy() {
 472         return dispatchPolicy;
 473     }
 474
 475     public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
 476         this.dispatchPolicy = dispatchPolicy;
 477     }
 478
 479     public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
 480         return subscriptionRecoveryPolicy;
 481     }
 482
 483     public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
 484         this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
 485     }
 486
 487     public boolean isSendAdvisoryIfNoConsumers() {
 488         return sendAdvisoryIfNoConsumers;
 489     }
 490
 491     public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
 492         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
 493     }
 494
 495     public MessageStore getMessageStore() {
 496         return store;
 497     }
 498
 499     public DeadLetterStrategy getDeadLetterStrategy() {
 500         return deadLetterStrategy;
 501     }
 502
 503     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
 504         this.deadLetterStrategy = deadLetterStrategy;
 505     }
 506
 507     public String
  getName() { 508         return getActiveMQDestination().getPhysicalName();
 509     }
 510
 511
 512             protected void dispatch(final ConnectionContext context, Message message) throws Exception
  { 515         destinationStatistics.getEnqueues().increment();
 516         dispatchValve.increment();
 517         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
 518         try {
 519             if (!subscriptionRecoveryPolicy.add(context, message)) {
 520                 return;
 521             }
 522             synchronized(consumers) {
 523                 if (consumers.isEmpty()) {
 524                     onMessageWithNoConsumers(context, message);
 525                     return;
 526                 }
 527             }
 528
 529             msgContext.setDestination(destination);
 530             msgContext.setMessageReference(message);
 531
 532             if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
 533                 onMessageWithNoConsumers(context, message);
 534             }
 535         }
 536         finally {
 537             msgContext.clear();
 538             dispatchValve.decrement();
 539         }
 540     }
 541
 542
 546     protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Exception
  { 547         if (!message.isPersistent()) {
 548             if (sendAdvisoryIfNoConsumers) {
 549                                                 if (!AdvisorySupport.isAdvisoryTopic(destination)) {
 552
 553                                                             if( message.getOriginalDestination()!=null )
 556                         message.setOriginalDestination(message.getDestination());
 557                     if( message.getOriginalTransactionId()!=null )
 558                         message.setOriginalTransactionId(message.getTransactionId());
 559
 560                     ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
 561                     message.setDestination(advisoryTopic);
 562                     message.setTransactionId(null);
 563
 564                                         boolean originalFlowControl = context.isProducerFlowControl();
 566                     try {
 567                         context.setProducerFlowControl(false);
 568                         ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
 569                         producerExchange.setMutable(false);
 570                         producerExchange.setConnectionContext(context);
 571                         context.getBroker().send(producerExchange, message);
 572                     } finally {
 573                         context.setProducerFlowControl(originalFlowControl);
 574                     }
 575
 576                 }
 577             }
 578         }
 579     }
 580
 581
 582 }
 583
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |