1 14 15 package org.apache.activemq.broker.region; 16 17 import javax.jms.JMSException ; 18 import org.apache.activemq.broker.ConnectionContext; 19 import org.apache.activemq.command.ActiveMQDestination; 20 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 21 import org.apache.activemq.broker.region.policy.PolicyEntry; 22 import org.apache.activemq.command.ActiveMQDestination; 23 import org.apache.activemq.command.ConsumerInfo; 24 import org.apache.activemq.memory.UsageManager; 25 import org.apache.activemq.thread.TaskRunnerFactory; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 29 33 public class TempTopicRegion extends AbstractRegion{ 34 35 private static final Log log=LogFactory.getLog(TempTopicRegion.class); 36 37 public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics,UsageManager memoryManager, 38 TaskRunnerFactory taskRunnerFactory,DestinationFactory destinationFactory){ 39 super(broker,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory); 40 } 43 44 protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException { 45 if(info.isDurable()){ 46 throw new JMSException ("A durable subscription cannot be created for a temporary topic."); 47 } 48 try{ 49 TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager); 50 ActiveMQDestination destination=info.getDestination(); 52 if(destination!=null&&broker.getDestinationPolicy()!=null){ 53 PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination); 54 if(entry!=null){ 55 entry.configure(broker,memoryManager,answer); 56 } 57 } 58 answer.init(); 59 return answer; 60 }catch(Exception e){ 61 log.error("Failed to create TopicSubscription ",e); 62 JMSException jmsEx=new JMSException ("Couldn't create TopicSubscription"); 63 jmsEx.setLinkedException(e); 64 throw jmsEx; 65 } 66 } 67 68 public String toString(){ 69 return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory=" 70 +memoryManager.getPercentUsage()+"%"; 71 } 72 73 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 74 75 if( timeout == 0 ) 79 timeout = 1; 80 81 super.removeDestination(context, destination, timeout); 82 } 83 } 84 | Popular Tags |