KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > amq > AMQTransactionStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.store.amq;
20
21 import java.io.IOException JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.LinkedHashMap JavaDoc;
25 import java.util.Map JavaDoc;
26
27 import javax.transaction.xa.XAException JavaDoc;
28
29 import org.apache.activemq.command.JournalTopicAck;
30 import org.apache.activemq.command.JournalTransaction;
31 import org.apache.activemq.command.Message;
32 import org.apache.activemq.command.MessageAck;
33 import org.apache.activemq.command.TransactionId;
34 import org.apache.activemq.command.XATransactionId;
35 import org.apache.activemq.kaha.impl.async.Location;
36 import org.apache.activemq.store.TransactionRecoveryListener;
37 import org.apache.activemq.store.TransactionStore;
38
39
40 /**
41  */

42 public class AMQTransactionStore implements TransactionStore {
43
44     private final AMQPersistenceAdapter peristenceAdapter;
45     Map JavaDoc<TransactionId, Tx> inflightTransactions = new LinkedHashMap JavaDoc<TransactionId, Tx>();
46     Map JavaDoc<TransactionId, Tx> preparedTransactions = new LinkedHashMap JavaDoc<TransactionId, Tx>();
47     private boolean doingRecover;
48
49     
50     public static class TxOperation {
51         
52         static final byte ADD_OPERATION_TYPE = 0;
53         static final byte REMOVE_OPERATION_TYPE = 1;
54         static final byte ACK_OPERATION_TYPE = 3;
55         
56         public byte operationType;
57         public AMQMessageStore store;
58         public Object JavaDoc data;
59         public Location location;
60         
61         public TxOperation(byte operationType, AMQMessageStore store, Object JavaDoc data, Location location) {
62             this.operationType=operationType;
63             this.store=store;
64             this.data=data;
65             this.location=location;
66         }
67         
68     }
69     /**
70      * Operations
71      * @version $Revision: 1.6 $
72      */

73     public static class Tx {
74
75         private final Location location;
76         private ArrayList JavaDoc<TxOperation> operations = new ArrayList JavaDoc<TxOperation>();
77
78         public Tx(Location location) {
79             this.location=location;
80         }
81
82         public void add(AMQMessageStore store, Message msg, Location location) {
83             operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
84         }
85
86         public void add(AMQMessageStore store, MessageAck ack) {
87             operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null));
88         }
89
90         public void add(AMQTopicMessageStore store, JournalTopicAck ack) {
91             operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
92         }
93         
94         public Message[] getMessages() {
95             ArrayList JavaDoc<Object JavaDoc> list = new ArrayList JavaDoc<Object JavaDoc>();
96             for (Iterator JavaDoc<TxOperation> iter = operations.iterator(); iter.hasNext();) {
97                 TxOperation op = iter.next();
98                 if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
99                     list.add(op.data);
100                 }
101             }
102             Message rc[] = new Message[list.size()];
103             list.toArray(rc);
104             return rc;
105         }
106
107         public MessageAck[] getAcks() {
108             ArrayList JavaDoc<Object JavaDoc> list = new ArrayList JavaDoc<Object JavaDoc>();
109             for (Iterator JavaDoc<TxOperation> iter = operations.iterator(); iter.hasNext();) {
110                 TxOperation op = iter.next();
111                 if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
112                     list.add(op.data);
113                 }
114             }
115             MessageAck rc[] = new MessageAck[list.size()];
116             list.toArray(rc);
117             return rc;
118         }
119
120         public ArrayList JavaDoc<TxOperation> getOperations() {
121             return operations;
122         }
123
124     }
125
126     public AMQTransactionStore(AMQPersistenceAdapter adapter) {
127         this.peristenceAdapter = adapter;
128     }
129
130     /**
131      * @throws IOException
132      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
133      */

134     public void prepare(TransactionId txid) throws IOException JavaDoc{
135         Tx tx=null;
136         synchronized(inflightTransactions){
137             tx=inflightTransactions.remove(txid);
138         }
139         if(tx==null)
140             return;
141         peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
142         synchronized(preparedTransactions){
143             preparedTransactions.put(txid,tx);
144         }
145     }
146     
147     /**
148      * @throws IOException
149      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
150      */

151     public void replayPrepare(TransactionId txid) throws IOException JavaDoc{
152         Tx tx=null;
153         synchronized(inflightTransactions){
154             tx=inflightTransactions.remove(txid);
155         }
156         if(tx==null)
157             return;
158         synchronized(preparedTransactions){
159             preparedTransactions.put(txid,tx);
160         }
161     }
162
163     public Tx getTx(TransactionId txid,Location location){
164         Tx tx=null;
165         synchronized(inflightTransactions){
166             tx=inflightTransactions.get(txid);
167         }
168         if(tx==null){
169             tx=new Tx(location);
170             inflightTransactions.put(txid,tx);
171         }
172         return tx;
173     }
174
175     /**
176      * @throws XAException
177      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
178      */

179     public void commit(TransactionId txid,boolean wasPrepared) throws IOException JavaDoc{
180         Tx tx;
181         if(wasPrepared){
182             synchronized(preparedTransactions){
183                 tx=preparedTransactions.remove(txid);
184             }
185         }else{
186             synchronized(inflightTransactions){
187                 tx=inflightTransactions.remove(txid);
188             }
189         }
190         if(tx==null)
191             return;
192         if(txid.isXATransaction()){
193             peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
194         }else{
195             peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
196                     true);
197         }
198     }
199
200     /**
201      * @throws XAException
202      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
203      */

204     public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException JavaDoc{
205         if(wasPrepared){
206             synchronized(preparedTransactions){
207                 return preparedTransactions.remove(txid);
208             }
209         }else{
210             synchronized(inflightTransactions){
211                 return inflightTransactions.remove(txid);
212             }
213         }
214     }
215
216     /**
217      * @throws IOException
218      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
219      */

220     public void rollback(TransactionId txid) throws IOException JavaDoc{
221         Tx tx=null;
222         synchronized(inflightTransactions){
223             tx=inflightTransactions.remove(txid);
224         }
225         if(tx!=null)
226             synchronized(preparedTransactions){
227                 tx=preparedTransactions.remove(txid);
228             }
229         if(tx!=null){
230             if(txid.isXATransaction()){
231                 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
232             }else{
233                 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
234                         true);
235             }
236         }
237     }
238
239     /**
240      * @throws IOException
241      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
242      */

243     public void replayRollback(TransactionId txid) throws IOException JavaDoc{
244         boolean inflight=false;
245         synchronized(inflightTransactions){
246             inflight=inflightTransactions.remove(txid)!=null;
247         }
248         if(inflight){
249             synchronized(preparedTransactions){
250                 preparedTransactions.remove(txid);
251             }
252         }
253     }
254     
255     public void start() throws Exception JavaDoc {
256     }
257
258     public void stop() throws Exception JavaDoc {
259     }
260     
261     synchronized public void recover(TransactionRecoveryListener listener) throws IOException JavaDoc{
262         // All the in-flight transactions get rolled back..
263
synchronized(inflightTransactions){
264             inflightTransactions.clear();
265         }
266         this.doingRecover=true;
267         try{
268             Map JavaDoc<TransactionId, Tx> txs=null;
269             synchronized(preparedTransactions){
270                 txs=new LinkedHashMap JavaDoc<TransactionId, Tx>(preparedTransactions);
271             }
272             for(Iterator JavaDoc<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
273                 Object JavaDoc txid=iter.next();
274                 Tx tx=txs.get(txid);
275                 listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
276             }
277         }finally{
278             this.doingRecover=false;
279         }
280     }
281
282     /**
283      * @param message
284      * @throws IOException
285      */

286     void addMessage(AMQMessageStore store, Message message, Location location) throws IOException JavaDoc {
287         Tx tx = getTx(message.getTransactionId(), location);
288         tx.add(store, message, location);
289     }
290
291     /**
292      * @param ack
293      * @throws IOException
294      */

295     public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException JavaDoc {
296         Tx tx = getTx(ack.getTransactionId(), location);
297         tx.add(store, ack);
298     }
299     
300     
301     public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
302         Tx tx = getTx(ack.getTransactionId(), location);
303         tx.add(store, ack);
304     }
305
306
307     public Location checkpoint() throws IOException JavaDoc{
308         // Nothing really to checkpoint.. since, we don't
309
// checkpoint tx operations in to long term store until they are committed.
310
// But we keep track of the first location of an operation
311
// that was associated with an active tx. The journal can not
312
// roll over active tx records.
313
Location rc=null;
314         synchronized(inflightTransactions){
315             for(Iterator JavaDoc<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){
316                 Tx tx=iter.next();
317                 Location location=tx.location;
318                 if(rc==null||rc.compareTo(location)<0){
319                     rc=location;
320                 }
321             }
322         }
323         synchronized(preparedTransactions){
324             for(Iterator JavaDoc<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){
325                 Tx tx=iter.next();
326                 Location location=tx.location;
327                 if(rc==null||rc.compareTo(location)<0){
328                     rc=location;
329                 }
330             }
331             return rc;
332         }
333     }
334
335     public boolean isDoingRecover() {
336         return doingRecover;
337     }
338
339
340 }
341
Popular Tags