1 18 package org.apache.activemq.broker.region; 19 20 import java.io.IOException ; 21 import java.util.LinkedList ; 22 import java.util.Set ; 23 import java.util.concurrent.ConcurrentHashMap ; 24 import java.util.concurrent.CopyOnWriteArrayList ; 25 import java.util.concurrent.CopyOnWriteArraySet ; 26 import org.apache.activemq.advisory.AdvisorySupport; 27 import org.apache.activemq.broker.ConnectionContext; 28 import org.apache.activemq.broker.ProducerBrokerExchange; 29 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 30 import org.apache.activemq.broker.region.policy.DispatchPolicy; 31 import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPolicy; 32 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; 33 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 34 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 35 import org.apache.activemq.command.ActiveMQDestination; 36 import org.apache.activemq.command.ActiveMQTopic; 37 import org.apache.activemq.command.ExceptionResponse; 38 import org.apache.activemq.command.Message; 39 import org.apache.activemq.command.MessageAck; 40 import org.apache.activemq.command.MessageId; 41 import org.apache.activemq.command.ProducerAck; 42 import org.apache.activemq.command.SubscriptionInfo; 43 import org.apache.activemq.filter.MessageEvaluationContext; 44 import org.apache.activemq.memory.UsageManager; 45 import org.apache.activemq.store.MessageRecoveryListener; 46 import org.apache.activemq.store.MessageStore; 47 import org.apache.activemq.store.TopicMessageStore; 48 import org.apache.activemq.thread.TaskRunnerFactory; 49 import org.apache.activemq.thread.Valve; 50 import org.apache.activemq.transaction.Synchronization; 51 import org.apache.activemq.util.SubscriptionKey; 52 import org.apache.commons.logging.Log; 53 import org.apache.commons.logging.LogFactory; 54 55 61 public class Topic implements Destination { 62 private static final Log log = LogFactory.getLog(Topic.class); 63 protected final ActiveMQDestination destination; 64 protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList (); 65 protected final Valve dispatchValve = new Valve(true); 66 protected final TopicMessageStore store; protected final UsageManager usageManager; 68 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 69 70 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 71 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedSizedSubscriptionRecoveryPolicy(); 72 private boolean sendAdvisoryIfNoConsumers; 73 private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); 74 private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap (); 75 76 public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats, 77 TaskRunnerFactory taskFactory) { 78 79 this.destination = destination; 80 this.store = store; this.usageManager = new UsageManager(memoryManager,destination.toString()); 82 this.usageManager.setUsagePortion(1.0f); 83 84 if( store!=null ) { 87 store.setUsageManager(usageManager); 88 } 89 90 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 92 this.destinationStatistics.setParent(parentStats); 93 } 94 95 public boolean lock(MessageReference node, LockOwner sub) { 96 return true; 97 } 98 99 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 100 101 sub.add(context, this); 102 destinationStatistics.getConsumers().increment(); 103 104 if ( !sub.getConsumerInfo().isDurable() ) { 105 106 if (sub.getConsumerInfo().isRetroactive()) { 108 109 dispatchValve.turnOff(); 112 try { 113 114 synchronized(consumers) { 115 consumers.add(sub); 116 } 117 subscriptionRecoveryPolicy.recover(context, this, sub); 118 119 } finally { 120 dispatchValve.turnOn(); 121 } 122 123 } else { 124 synchronized(consumers) { 125 consumers.add(sub); 126 } 127 } 128 } else { 129 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 130 durableSubcribers.put(dsub.getSubscriptionKey(), dsub); 131 } 132 } 133 134 public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception { 135 if ( !sub.getConsumerInfo().isDurable() ) { 136 destinationStatistics.getConsumers().decrement(); 137 synchronized(consumers) { 138 consumers.remove(sub); 139 } 140 } 141 sub.remove(context, this); 142 } 143 144 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException { 145 if (store != null) { 146 store.deleteSubscription(key.clientId, key.subscriptionName); 147 Object removed = durableSubcribers.remove(key); 148 if(removed != null) { 149 destinationStatistics.getConsumers().decrement(); 150 } 151 } 152 } 153 154 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 155 dispatchValve.turnOff(); 159 try { 160 161 synchronized(consumers) { 162 consumers.add(subscription); 163 } 164 165 if (store == null ) 166 return; 167 168 String clientId = subscription.getClientId(); 170 String subscriptionName = subscription.getSubscriptionName(); 171 String selector = subscription.getConsumerInfo().getSelector(); 172 SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName); 173 if (info != null) { 174 String s1 = info.getSelector(); 176 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { 177 store.deleteSubscription(clientId, subscriptionName); 179 info = null; 180 } 181 } 182 if (info == null) { 184 store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive()); 185 } 186 187 final MessageEvaluationContext msgContext = new MessageEvaluationContext(); 188 msgContext.setDestination(destination); 189 if(subscription.isRecoveryRequired()){ 190 store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){ 191 public void recoverMessage(Message message) throws Exception { 192 message.setRegionDestination(Topic.this); 193 try{ 194 msgContext.setMessageReference(message); 195 if(subscription.matches(message,msgContext)){ 196 subscription.add(message); 197 } 198 }catch(InterruptedException e){ 199 Thread.currentThread().interrupt(); 200 }catch(IOException e){ 201 e.printStackTrace(); 203 } 204 } 205 206 public void recoverMessageReference(MessageId messageReference) throws Exception { 207 throw new RuntimeException ("Should not be called."); 208 } 209 210 public void finished(){} 211 212 public boolean hasSpace(){ 213 return true; 214 } 215 }); 216 } 217 218 219 220 } 221 finally { 222 dispatchValve.turnOn(); 223 } 224 } 225 226 public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception { 227 synchronized(consumers) { 228 consumers.remove(sub); 229 } 230 sub.remove(context, this); 231 } 232 233 234 protected void recoverRetroactiveMessages(ConnectionContext context,Subscription subscription) throws Exception { 235 if(subscription.getConsumerInfo().isRetroactive()){ 236 subscriptionRecoveryPolicy.recover(context,this,subscription); 237 } 238 } 239 240 241 private final LinkedList <Runnable > messagesWaitingForSpace = new LinkedList <Runnable >(); 242 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable () { 243 public void run() { 244 245 248 synchronized( messagesWaitingForSpace ) { 249 while( !usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) { 250 Runnable op = messagesWaitingForSpace.removeFirst(); 251 op.run(); 252 } 253 } 254 255 }; 256 }; 257 258 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 259 final ConnectionContext context = producerExchange.getConnectionContext(); 260 261 if( message.isExpired() ) { 264 if (log.isDebugEnabled()) { 265 log.debug("Expired message: " + message); 266 } 267 if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) { 268 ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); 269 context.getConnection().dispatchAsync(ack); 270 } 271 return; 272 } 273 274 if ( context.isProducerFlowControl() && usageManager.isFull() ) { 275 if(usageManager.isSendFailIfNoSpace()){ 276 throw new javax.jms.ResourceAllocationException ("Usage Manager memory limit reached"); 277 } 278 279 if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || message.isResponseRequired() ) { 282 synchronized( messagesWaitingForSpace ) { 283 messagesWaitingForSpace.add(new Runnable () { 284 public void run() { 285 286 if(message.isExpired()){ 288 if (log.isDebugEnabled()) { 289 log.debug("Expired message: " + message); 290 } 291 292 if( !message.isResponseRequired() ) { 293 ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); 294 context.getConnection().dispatchAsync(ack); 295 } 296 return; 297 } 298 299 300 try { 301 doMessageSend(producerExchange, message); 302 } catch (Exception e) { 303 if( message.isResponseRequired() ) { 304 ExceptionResponse response = new ExceptionResponse(e); 305 response.setCorrelationId(message.getCommandId()); 306 context.getConnection().dispatchAsync(response); 307 } 308 } 309 } 310 }); 311 312 if( !usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask) ) { 314 sendMessagesWaitingForSpaceTask.run(); 316 } 317 context.setDontSendReponse(true); 318 return; 319 } 320 321 } else { 322 323 while( !usageManager.waitForSpace(1000) ) { 326 if( context.getStopping().get() ) 327 throw new IOException ("Connection closed, send aborted."); 328 } 329 330 if(message.isExpired()){ 333 if (log.isDebugEnabled()) { 334 log.debug("Expired message: " + message); 335 } 336 return; 337 } 338 } 339 } 340 341 doMessageSend(producerExchange, message); 342 } 343 344 private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException , Exception { 345 final ConnectionContext context = producerExchange.getConnectionContext(); 346 message.setRegionDestination(this); 347 348 if (store != null && message.isPersistent() && !canOptimizeOutPersistence() ) 349 store.addMessage(context, message); 350 351 message.incrementReferenceCount(); 352 try { 353 354 if (context.isInTransaction()) { 355 context.getTransaction().addSynchronization(new Synchronization() { 356 public void afterCommit() throws Exception { 357 if( message.isExpired() ) { 360 return; 362 } 363 dispatch(context, message); 364 } 365 }); 366 367 } 368 else { 369 dispatch(context, message); 370 } 371 372 } 373 finally { 374 message.decrementReferenceCount(); 375 } 376 } 377 378 private boolean canOptimizeOutPersistence() { 379 return durableSubcribers.size()==0; 380 } 381 382 public String toString() { 383 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 384 } 385 386 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { 387 if (store != null && node.isPersistent()) { 388 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 389 store.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId()); 390 } 391 } 392 393 public void dispose(ConnectionContext context) throws IOException { 394 if (store != null) { 395 store.removeAllMessages(context); 396 } 397 destinationStatistics.setParent(null); 398 } 399 400 public void gc() { 401 } 402 403 public Message loadMessage(MessageId messageId) throws IOException { 404 return store != null ? store.getMessage(messageId) : null; 405 } 406 407 public void start() throws Exception { 408 this.subscriptionRecoveryPolicy.start(); 409 if (usageManager != null) { 410 usageManager.start(); 411 } 412 413 } 414 415 public void stop() throws Exception { 416 this.subscriptionRecoveryPolicy.stop(); 417 if (usageManager != null) { 418 usageManager.stop(); 419 } 420 } 421 422 public Message[] browse(){ 423 final Set result=new CopyOnWriteArraySet (); 424 try{ 425 if(store!=null){ 426 store.recover(new MessageRecoveryListener(){ 427 public void recoverMessage(Message message) throws Exception { 428 result.add(message); 429 } 430 431 public void recoverMessageReference(MessageId messageReference) throws Exception {} 432 433 public void finished(){} 434 435 public boolean hasSpace(){ 436 return true; 437 } 438 }); 439 Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 440 if(msgs!=null){ 441 for(int i=0;i<msgs.length;i++){ 442 result.add(msgs[i]); 443 } 444 } 445 } 446 }catch(Throwable e){ 447 log.warn("Failed to browse Topic: "+getActiveMQDestination().getPhysicalName(),e); 448 } 449 return (Message[]) result.toArray(new Message[result.size()]); 450 } 451 452 455 public UsageManager getUsageManager() { 456 return usageManager; 457 } 458 459 public DestinationStatistics getDestinationStatistics() { 460 return destinationStatistics; 461 } 462 463 public ActiveMQDestination getActiveMQDestination() { 464 return destination; 465 } 466 467 public String getDestination() { 468 return destination.getPhysicalName(); 469 } 470 471 public DispatchPolicy getDispatchPolicy() { 472 return dispatchPolicy; 473 } 474 475 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 476 this.dispatchPolicy = dispatchPolicy; 477 } 478 479 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 480 return subscriptionRecoveryPolicy; 481 } 482 483 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { 484 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; 485 } 486 487 public boolean isSendAdvisoryIfNoConsumers() { 488 return sendAdvisoryIfNoConsumers; 489 } 490 491 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 492 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 493 } 494 495 public MessageStore getMessageStore() { 496 return store; 497 } 498 499 public DeadLetterStrategy getDeadLetterStrategy() { 500 return deadLetterStrategy; 501 } 502 503 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 504 this.deadLetterStrategy = deadLetterStrategy; 505 } 506 507 public String getName() { 508 return getActiveMQDestination().getPhysicalName(); 509 } 510 511 512 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 515 destinationStatistics.getEnqueues().increment(); 516 dispatchValve.increment(); 517 MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); 518 try { 519 if (!subscriptionRecoveryPolicy.add(context, message)) { 520 return; 521 } 522 synchronized(consumers) { 523 if (consumers.isEmpty()) { 524 onMessageWithNoConsumers(context, message); 525 return; 526 } 527 } 528 529 msgContext.setDestination(destination); 530 msgContext.setMessageReference(message); 531 532 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 533 onMessageWithNoConsumers(context, message); 534 } 535 } 536 finally { 537 msgContext.clear(); 538 dispatchValve.decrement(); 539 } 540 } 541 542 546 protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Exception { 547 if (!message.isPersistent()) { 548 if (sendAdvisoryIfNoConsumers) { 549 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 552 553 if( message.getOriginalDestination()!=null ) 556 message.setOriginalDestination(message.getDestination()); 557 if( message.getOriginalTransactionId()!=null ) 558 message.setOriginalTransactionId(message.getTransactionId()); 559 560 ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 561 message.setDestination(advisoryTopic); 562 message.setTransactionId(null); 563 564 boolean originalFlowControl = context.isProducerFlowControl(); 566 try { 567 context.setProducerFlowControl(false); 568 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 569 producerExchange.setMutable(false); 570 producerExchange.setConnectionContext(context); 571 context.getBroker().send(producerExchange, message); 572 } finally { 573 context.setProducerFlowControl(originalFlowControl); 574 } 575 576 } 577 } 578 } 579 } 580 581 582 } 583 | Popular Tags |