1 18 package org.apache.activemq.broker.region; 19 20 import java.io.IOException ; 21 import javax.jms.InvalidSelectorException ; 22 import javax.jms.JMSException ; 23 import org.apache.activemq.broker.Broker; 24 import org.apache.activemq.broker.ConnectionContext; 25 import org.apache.activemq.broker.region.group.MessageGroupMap; 26 import org.apache.activemq.command.ActiveMQMessage; 27 import org.apache.activemq.command.ConsumerId; 28 import org.apache.activemq.command.ConsumerInfo; 29 import org.apache.activemq.command.Message; 30 import org.apache.activemq.command.MessageAck; 31 import org.apache.activemq.transaction.Synchronization; 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 35 public class QueueSubscription extends PrefetchSubscription implements LockOwner { 36 37 private static final Log log = LogFactory.getLog(QueueSubscription.class); 38 39 public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { 40 super(broker,context, info); 41 } 42 43 48 protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { 49 50 51 52 final Destination q = n.getRegionDestination(); 53 q.acknowledge(context, this, ack, n); 54 55 final QueueMessageReference node = (QueueMessageReference) n; 56 final Queue queue = (Queue)q; 57 if( !ack.isInTransaction() ) { 58 node.drop(); 59 queue.dropEvent(); 60 } else { 61 node.setAcked(true); 62 context.getTransaction().addSynchronization(new Synchronization(){ 63 public void afterCommit() throws Exception { 64 node.drop(); 65 queue.dropEvent(); 66 } 67 public void afterRollback() throws Exception { 68 node.setAcked(false); 69 } 70 }); 71 } 72 } 73 74 protected boolean canDispatch(MessageReference n) throws IOException { 75 QueueMessageReference node = (QueueMessageReference) n; 76 if( node.isAcked()) 77 return false; 78 String groupId = node.getGroupID(); 80 int sequence = node.getGroupSequence(); 81 if( groupId!=null ) { 82 MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners(); 83 84 if( sequence == 1 ) { 86 if( node.lock(this) ) { 87 assignGroupToMe(messageGroupOwners, n, groupId); 88 return true; 89 } else { 90 return false; 91 } 92 } 93 94 ConsumerId groupOwner; 97 synchronized(node) { 98 groupOwner = messageGroupOwners.get(groupId); 99 if( groupOwner==null ) { 100 if( node.lock(this) ) { 101 assignGroupToMe(messageGroupOwners, n, groupId); 102 return true; 103 } else { 104 return false; 105 } 106 } 107 } 108 109 if( groupOwner.equals(info.getConsumerId()) ) { 110 if ( sequence < 0 ) { 112 messageGroupOwners.removeGroup(groupId); 113 } 114 return true; 115 } 116 117 return false; 118 119 } else { 120 return node.lock(this); 121 } 122 } 123 124 128 protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { 129 messageGroupOwners.put(groupId, info.getConsumerId()); 130 Message message = n.getMessage(); 131 if (message instanceof ActiveMQMessage) { 132 ActiveMQMessage activeMessage = (ActiveMQMessage) message; 133 try { 134 activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false); 135 } 136 catch (JMSException e) { 137 log.warn("Failed to set boolean header: " + e, e); 138 } 139 } 140 } 141 142 public String toString() { 143 return 144 "QueueSubscription:" + 145 " consumer="+info.getConsumerId()+ 146 ", destinations="+destinations.size()+ 147 ", dispatched="+dispatched.size()+ 148 ", delivered="+this.prefetchExtension+ 149 ", pending="+getPendingQueueSize(); 150 } 151 152 public int getLockPriority() { 153 return info.getPriority(); 154 } 155 156 public boolean isLockExclusive() { 157 return info.isExclusive(); 158 } 159 160 167 protected boolean dispatch(MessageReference node) throws IOException { 168 boolean rc = false; 169 node.incrementReferenceCount(); 171 try { 172 rc = super.dispatch(node); 173 } finally { 174 if( !rc ) { 177 node.decrementReferenceCount(); 178 } 179 } 180 return rc; 181 } 182 183 188 protected void onDispatch(MessageReference node, Message message) { 189 node.decrementReferenceCount(); 192 super.onDispatch(node, message); 193 } 194 195 198 protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException , Exception { 199 node.incrementReferenceCount(); 201 try{ 202 super.sendToDLQ(context, node); 203 } finally { 204 node.decrementReferenceCount(); 206 } 207 } 208 209 211 public void destroy() { 212 } 213 214 } 215 | Popular Tags |