1 18 package org.apache.activemq.broker; 19 20 import java.util.concurrent.ConcurrentHashMap ; 21 22 import org.apache.activemq.command.ConnectionInfo; 23 import org.apache.activemq.command.LocalTransactionId; 24 import org.apache.activemq.command.Message; 25 import org.apache.activemq.command.MessageAck; 26 import org.apache.activemq.command.TransactionId; 27 import org.apache.activemq.command.XATransactionId; 28 import org.apache.activemq.store.TransactionRecoveryListener; 29 import org.apache.activemq.store.TransactionStore; 30 import org.apache.activemq.transaction.LocalTransaction; 31 import org.apache.activemq.transaction.Transaction; 32 import org.apache.activemq.transaction.XATransaction; 33 import org.apache.activemq.util.IOExceptionSupport; 34 import org.apache.activemq.util.WrappedException; 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 38 import javax.jms.JMSException ; 39 import javax.transaction.xa.XAException ; 40 41 import java.util.ArrayList ; 42 import java.util.Iterator ; 43 import java.util.LinkedHashMap ; 44 import java.util.Map ; 45 46 51 public class TransactionBroker extends BrokerFilter { 52 53 private static final Log log = LogFactory.getLog(TransactionBroker.class); 54 55 private TransactionStore transactionStore; 57 private Map xaTransactions = new LinkedHashMap (); 58 59 public TransactionBroker(Broker next, TransactionStore transactionStore) { 60 super(next); 61 this.transactionStore=transactionStore; 62 } 63 64 70 73 public void start() throws Exception { 74 transactionStore.start(); 75 try { 76 final ConnectionContext context = new ConnectionContext(); 77 context.setBroker(this); 78 context.setInRecoveryMode(true); 79 context.setTransactions(new ConcurrentHashMap ()); 80 context.setProducerFlowControl(false); 81 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 82 producerExchange.setMutable(true); 83 producerExchange.setConnectionContext(context); 84 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange(); 85 consumerExchange.setConnectionContext(context); 86 transactionStore.recover(new TransactionRecoveryListener() { 87 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { 88 try { 89 beginTransaction(context, xid); 90 for (int i = 0; i < addedMessages.length; i++) { 91 send(producerExchange, addedMessages[i]); 92 } 93 for (int i = 0; i < aks.length; i++) { 94 acknowledge(consumerExchange, aks[i]); 95 } 96 prepareTransaction(context, xid); 97 } catch (Throwable e) { 98 throw new WrappedException(e); 99 } 100 } 101 }); 102 } catch (WrappedException e) { 103 Throwable cause = e.getCause(); 104 throw IOExceptionSupport.create("Recovery Failed: "+cause.getMessage(), cause); 105 } 106 next.start(); 107 } 108 109 public void stop() throws Exception { 110 transactionStore.stop(); 111 next.stop(); 112 } 113 114 115 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 121 ArrayList txs = new ArrayList (); 122 synchronized(xaTransactions){ 123 for(Iterator iter=xaTransactions.values().iterator();iter.hasNext();){ 124 Transaction tx=(Transaction)iter.next(); 125 if(tx.isPrepared()) 126 txs.add(tx.getTransactionId()); 127 } 128 } 129 XATransactionId rc[] = new XATransactionId[txs.size()]; 130 txs.toArray(rc); 131 return rc; 132 } 133 134 public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception { 135 if(xid.isXATransaction()){ 137 Transaction transaction=null; 138 synchronized(xaTransactions){ 139 transaction=(Transaction)xaTransactions.get(xid); 140 if(transaction!=null) 141 return; 142 transaction=new XATransaction(transactionStore,(XATransactionId)xid,this); 143 xaTransactions.put(xid,transaction); 144 } 145 }else{ 146 Map transactionMap=context.getTransactions(); 147 Transaction transaction=(Transaction)transactionMap.get(xid); 148 if(transaction!=null) 149 throw new JMSException ("Transaction '"+xid+"' has already been started."); 150 transaction=new LocalTransaction(transactionStore,(LocalTransactionId)xid,context); 151 transactionMap.put(xid,transaction); 152 } 153 } 154 155 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 156 Transaction transaction = getTransaction(context, xid, false); 157 return transaction.prepare(); 158 } 159 160 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 161 Transaction transaction = getTransaction(context, xid, true); 162 transaction.commit(onePhase); 163 } 164 165 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 166 Transaction transaction = getTransaction(context, xid, true); 167 transaction.rollback(); 168 } 169 170 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 171 Transaction transaction = getTransaction(context, xid, true); 172 transaction.rollback(); 173 } 174 175 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 176 final ConnectionContext context = consumerExchange.getConnectionContext(); 179 Transaction originalTx = context.getTransaction(); 180 Transaction transaction=null; 181 if( ack.isInTransaction() ) { 182 transaction = getTransaction(context, ack.getTransactionId(), false); 183 } 184 context.setTransaction(transaction); 185 try { 186 next.acknowledge(consumerExchange, ack); 187 } finally { 188 context.setTransaction(originalTx); 189 } 190 } 191 192 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 193 final ConnectionContext context = producerExchange.getConnectionContext(); 196 Transaction originalTx = context.getTransaction(); 197 Transaction transaction=null; 198 if( message.getTransactionId()!=null ) { 199 transaction = getTransaction(context, message.getTransactionId(), false); 200 } 201 context.setTransaction(transaction); 202 try { 203 next.send(producerExchange, message); 204 } finally { 205 context.setTransaction(originalTx); 206 } 207 } 208 209 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 210 for (Iterator iter = context.getTransactions().values().iterator(); iter.hasNext();) { 211 try { 212 Transaction transaction = (Transaction) iter.next(); 213 transaction.rollback(); 214 } 215 catch (Exception e) { 216 log.warn("ERROR Rolling back disconnected client's transactions: ", e); 217 } 218 iter.remove(); 219 } 220 next.removeConnection(context, info, error); 221 } 222 223 public Transaction getTransaction(ConnectionContext context,TransactionId xid,boolean mightBePrepared) 229 throws JMSException ,XAException { 230 Map transactionMap=null; 231 synchronized(xaTransactions){ 232 transactionMap=xid.isXATransaction()?xaTransactions:context.getTransactions(); 233 } 234 Transaction transaction=(Transaction)transactionMap.get(xid); 235 if(transaction!=null) 236 return transaction; 237 if(xid.isXATransaction()){ 238 XAException e=new XAException ("Transaction '"+xid+"' has not been started."); 239 e.errorCode=XAException.XAER_NOTA; 240 throw e; 241 }else{ 242 throw new JMSException ("Transaction '"+xid+"' has not been started."); 243 } 244 } 245 246 public void removeTransaction(XATransactionId xid){ 247 synchronized(xaTransactions){ 248 xaTransactions.remove(xid); 249 } 250 } 251 252 } 253 | Popular Tags |