KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > TransactionBroker


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import java.util.concurrent.ConcurrentHashMap JavaDoc;
21
22 import org.apache.activemq.command.ConnectionInfo;
23 import org.apache.activemq.command.LocalTransactionId;
24 import org.apache.activemq.command.Message;
25 import org.apache.activemq.command.MessageAck;
26 import org.apache.activemq.command.TransactionId;
27 import org.apache.activemq.command.XATransactionId;
28 import org.apache.activemq.store.TransactionRecoveryListener;
29 import org.apache.activemq.store.TransactionStore;
30 import org.apache.activemq.transaction.LocalTransaction;
31 import org.apache.activemq.transaction.Transaction;
32 import org.apache.activemq.transaction.XATransaction;
33 import org.apache.activemq.util.IOExceptionSupport;
34 import org.apache.activemq.util.WrappedException;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37
38 import javax.jms.JMSException JavaDoc;
39 import javax.transaction.xa.XAException JavaDoc;
40
41 import java.util.ArrayList JavaDoc;
42 import java.util.Iterator JavaDoc;
43 import java.util.LinkedHashMap JavaDoc;
44 import java.util.Map JavaDoc;
45
46 /**
47  * This broker filter handles the transaction related operations in the Broker interface.
48  *
49  * @version $Revision: 1.10 $
50  */

51 public class TransactionBroker extends BrokerFilter {
52     
53     private static final Log log = LogFactory.getLog(TransactionBroker.class);
54
55     // The prepared XA transactions.
56
private TransactionStore transactionStore;
57     private Map JavaDoc xaTransactions = new LinkedHashMap JavaDoc();
58
59     public TransactionBroker(Broker next, TransactionStore transactionStore) {
60         super(next);
61         this.transactionStore=transactionStore;
62     }
63     
64     //////////////////////////////////////////////////////////////////////////////
65
//
66
// Life cycle Methods
67
//
68
//////////////////////////////////////////////////////////////////////////////
69

70     /**
71      * Recovers any prepared transactions.
72      */

73     public void start() throws Exception JavaDoc {
74         transactionStore.start();
75         try {
76             final ConnectionContext context = new ConnectionContext();
77             context.setBroker(this);
78             context.setInRecoveryMode(true);
79             context.setTransactions(new ConcurrentHashMap JavaDoc());
80             context.setProducerFlowControl(false);
81             final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
82             producerExchange.setMutable(true);
83             producerExchange.setConnectionContext(context);
84             final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
85             consumerExchange.setConnectionContext(context);
86             transactionStore.recover(new TransactionRecoveryListener() {
87                 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
88                     try {
89                         beginTransaction(context, xid);
90                         for (int i = 0; i < addedMessages.length; i++) {
91                             send(producerExchange, addedMessages[i]);
92                         }
93                         for (int i = 0; i < aks.length; i++) {
94                             acknowledge(consumerExchange, aks[i]);
95                         }
96                         prepareTransaction(context, xid);
97                     } catch (Throwable JavaDoc e) {
98                         throw new WrappedException(e);
99                     }
100                 }
101             });
102         } catch (WrappedException e) {
103             Throwable JavaDoc cause = e.getCause();
104             throw IOExceptionSupport.create("Recovery Failed: "+cause.getMessage(), cause);
105         }
106         next.start();
107     }
108     
109     public void stop() throws Exception JavaDoc {
110         transactionStore.stop();
111         next.stop();
112     }
113  
114
115     //////////////////////////////////////////////////////////////////////////////
116
//
117
// BrokerFilter overrides
118
//
119
//////////////////////////////////////////////////////////////////////////////
120
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception JavaDoc {
121         ArrayList JavaDoc txs = new ArrayList JavaDoc();
122         synchronized(xaTransactions){
123             for(Iterator JavaDoc iter=xaTransactions.values().iterator();iter.hasNext();){
124                 Transaction tx=(Transaction)iter.next();
125                 if(tx.isPrepared())
126                     txs.add(tx.getTransactionId());
127             }
128         }
129         XATransactionId rc[] = new XATransactionId[txs.size()];
130         txs.toArray(rc);
131         return rc;
132     }
133
134     public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception JavaDoc{
135         // the transaction may have already been started.
136
if(xid.isXATransaction()){
137             Transaction transaction=null;
138             synchronized(xaTransactions){
139                 transaction=(Transaction)xaTransactions.get(xid);
140                 if(transaction!=null)
141                     return;
142                 transaction=new XATransaction(transactionStore,(XATransactionId)xid,this);
143                 xaTransactions.put(xid,transaction);
144             }
145         }else{
146             Map JavaDoc transactionMap=context.getTransactions();
147             Transaction transaction=(Transaction)transactionMap.get(xid);
148             if(transaction!=null)
149                 throw new JMSException JavaDoc("Transaction '"+xid+"' has already been started.");
150             transaction=new LocalTransaction(transactionStore,(LocalTransactionId)xid,context);
151             transactionMap.put(xid,transaction);
152         }
153     }
154
155     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
156         Transaction transaction = getTransaction(context, xid, false);
157         return transaction.prepare();
158     }
159     
160     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception JavaDoc {
161         Transaction transaction = getTransaction(context, xid, true);
162         transaction.commit(onePhase);
163     }
164
165     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
166         Transaction transaction = getTransaction(context, xid, true);
167         transaction.rollback();
168     }
169     
170     public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception JavaDoc {
171         Transaction transaction = getTransaction(context, xid, true);
172         transaction.rollback();
173     }
174     
175     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception JavaDoc {
176         // This method may be invoked recursively.
177
// Track original tx so that it can be restored.
178
final ConnectionContext context = consumerExchange.getConnectionContext();
179         Transaction originalTx = context.getTransaction();
180         Transaction transaction=null;
181         if( ack.isInTransaction() ) {
182             transaction = getTransaction(context, ack.getTransactionId(), false);
183         }
184         context.setTransaction(transaction);
185         try {
186             next.acknowledge(consumerExchange, ack);
187         } finally {
188             context.setTransaction(originalTx);
189         }
190     }
191     
192     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception JavaDoc {
193         // This method may be invoked recursively.
194
// Track original tx so that it can be restored.
195
final ConnectionContext context = producerExchange.getConnectionContext();
196         Transaction originalTx = context.getTransaction();
197         Transaction transaction=null;
198         if( message.getTransactionId()!=null ) {
199             transaction = getTransaction(context, message.getTransactionId(), false);
200         }
201         context.setTransaction(transaction);
202         try {
203             next.send(producerExchange, message);
204         } finally {
205             context.setTransaction(originalTx);
206         }
207     }
208     
209     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable JavaDoc error) throws Exception JavaDoc {
210         for (Iterator JavaDoc iter = context.getTransactions().values().iterator(); iter.hasNext();) {
211             try {
212                 Transaction transaction = (Transaction) iter.next();
213                 transaction.rollback();
214             }
215             catch (Exception JavaDoc e) {
216                 log.warn("ERROR Rolling back disconnected client's transactions: ", e);
217             }
218             iter.remove();
219         }
220         next.removeConnection(context, info, error);
221     }
222     
223     //////////////////////////////////////////////////////////////////////////////
224
//
225
// Implementation help methods.
226
//
227
//////////////////////////////////////////////////////////////////////////////
228
public Transaction getTransaction(ConnectionContext context,TransactionId xid,boolean mightBePrepared)
229             throws JMSException JavaDoc,XAException JavaDoc{
230         Map JavaDoc transactionMap=null;
231         synchronized(xaTransactions){
232             transactionMap=xid.isXATransaction()?xaTransactions:context.getTransactions();
233         }
234         Transaction transaction=(Transaction)transactionMap.get(xid);
235         if(transaction!=null)
236             return transaction;
237         if(xid.isXATransaction()){
238             XAException JavaDoc e=new XAException JavaDoc("Transaction '"+xid+"' has not been started.");
239             e.errorCode=XAException.XAER_NOTA;
240             throw e;
241         }else{
242             throw new JMSException JavaDoc("Transaction '"+xid+"' has not been started.");
243         }
244     }
245
246     public void removeTransaction(XATransactionId xid){
247         synchronized(xaTransactions){
248             xaTransactions.remove(xid);
249         }
250     }
251
252 }
253
Popular Tags