KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > journal > JournalTransactionStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.store.journal;
20
21 import java.io.IOException JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.LinkedHashMap JavaDoc;
25 import java.util.List JavaDoc;
26 import java.util.Map JavaDoc;
27
28 import javax.transaction.xa.XAException JavaDoc;
29
30 import org.apache.activeio.journal.RecordLocation;
31 import org.apache.activemq.command.JournalTopicAck;
32 import org.apache.activemq.command.JournalTransaction;
33 import org.apache.activemq.command.Message;
34 import org.apache.activemq.command.MessageAck;
35 import org.apache.activemq.command.TransactionId;
36 import org.apache.activemq.command.XATransactionId;
37 import org.apache.activemq.store.TransactionRecoveryListener;
38 import org.apache.activemq.store.TransactionStore;
39
40
41 /**
42  */

43 public class JournalTransactionStore implements TransactionStore {
44
45     private final JournalPersistenceAdapter peristenceAdapter;
46     Map JavaDoc inflightTransactions = new LinkedHashMap JavaDoc();
47     Map JavaDoc preparedTransactions = new LinkedHashMap JavaDoc();
48     private boolean doingRecover;
49
50     
51     public static class TxOperation {
52         
53         static final byte ADD_OPERATION_TYPE = 0;
54         static final byte REMOVE_OPERATION_TYPE = 1;
55         static final byte ACK_OPERATION_TYPE = 3;
56         
57         public byte operationType;
58         public JournalMessageStore store;
59         public Object JavaDoc data;
60         
61         public TxOperation(byte operationType, JournalMessageStore store, Object JavaDoc data) {
62             this.operationType=operationType;
63             this.store=store;
64             this.data=data;
65         }
66         
67     }
68     /**
69      * Operations
70      * @version $Revision: 1.6 $
71      */

72     public static class Tx {
73
74         private final RecordLocation location;
75         private ArrayList JavaDoc operations = new ArrayList JavaDoc();
76
77         public Tx(RecordLocation location) {
78             this.location=location;
79         }
80
81         public void add(JournalMessageStore store, Message msg) {
82             operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
83         }
84
85         public void add(JournalMessageStore store, MessageAck ack) {
86             operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
87         }
88
89         public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
90             operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
91         }
92         
93         public Message[] getMessages() {
94             ArrayList JavaDoc list = new ArrayList JavaDoc();
95             for (Iterator JavaDoc iter = operations.iterator(); iter.hasNext();) {
96                 TxOperation op = (TxOperation) iter.next();
97                 if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
98                     list.add(op.data);
99                 }
100             }
101             Message rc[] = new Message[list.size()];
102             list.toArray(rc);
103             return rc;
104         }
105
106         public MessageAck[] getAcks() {
107             ArrayList JavaDoc list = new ArrayList JavaDoc();
108             for (Iterator JavaDoc iter = operations.iterator(); iter.hasNext();) {
109                 TxOperation op = (TxOperation) iter.next();
110                 if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
111                     list.add(op.data);
112                 }
113             }
114             MessageAck rc[] = new MessageAck[list.size()];
115             list.toArray(rc);
116             return rc;
117         }
118
119         public ArrayList JavaDoc getOperations() {
120             return operations;
121         }
122
123     }
124
125     public JournalTransactionStore(JournalPersistenceAdapter adapter) {
126         this.peristenceAdapter = adapter;
127     }
128
129     /**
130      * @throws IOException
131      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
132      */

133     public void prepare(TransactionId txid) throws IOException JavaDoc{
134         Tx tx=null;
135         synchronized(inflightTransactions){
136             tx=(Tx)inflightTransactions.remove(txid);
137         }
138         if(tx==null)
139             return;
140         peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
141         synchronized(preparedTransactions){
142             preparedTransactions.put(txid,tx);
143         }
144     }
145     
146     /**
147      * @throws IOException
148      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
149      */

150     public void replayPrepare(TransactionId txid) throws IOException JavaDoc{
151         Tx tx=null;
152         synchronized(inflightTransactions){
153             tx=(Tx)inflightTransactions.remove(txid);
154         }
155         if(tx==null)
156             return;
157         synchronized(preparedTransactions){
158             preparedTransactions.put(txid,tx);
159         }
160     }
161
162     public Tx getTx(Object JavaDoc txid,RecordLocation location){
163         Tx tx=null;
164         synchronized(inflightTransactions){
165             tx=(Tx)inflightTransactions.get(txid);
166         }
167         if(tx==null){
168             tx=new Tx(location);
169             inflightTransactions.put(txid,tx);
170         }
171         return tx;
172     }
173
174     /**
175      * @throws XAException
176      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
177      */

178     public void commit(TransactionId txid,boolean wasPrepared) throws IOException JavaDoc{
179         Tx tx;
180         if(wasPrepared){
181             synchronized(preparedTransactions){
182                 tx=(Tx)preparedTransactions.remove(txid);
183             }
184         }else{
185             synchronized(inflightTransactions){
186                 tx=(Tx)inflightTransactions.remove(txid);
187             }
188         }
189         if(tx==null)
190             return;
191         if(txid.isXATransaction()){
192             peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
193         }else{
194             peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
195                     true);
196         }
197     }
198
199     /**
200      * @throws XAException
201      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
202      */

203     public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException JavaDoc{
204         if(wasPrepared){
205             synchronized(preparedTransactions){
206                 return (Tx)preparedTransactions.remove(txid);
207             }
208         }else{
209             synchronized(inflightTransactions){
210                 return (Tx)inflightTransactions.remove(txid);
211             }
212         }
213     }
214
215     /**
216      * @throws IOException
217      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
218      */

219     public void rollback(TransactionId txid) throws IOException JavaDoc{
220         Tx tx=null;
221         synchronized(inflightTransactions){
222             tx=(Tx)inflightTransactions.remove(txid);
223         }
224         if(tx!=null)
225             synchronized(preparedTransactions){
226                 tx=(Tx)preparedTransactions.remove(txid);
227             }
228         if(tx!=null){
229             if(txid.isXATransaction()){
230                 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
231             }else{
232                 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
233                         true);
234             }
235         }
236     }
237
238     /**
239      * @throws IOException
240      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
241      */

242     public void replayRollback(TransactionId txid) throws IOException JavaDoc{
243         boolean inflight=false;
244         synchronized(inflightTransactions){
245             inflight=inflightTransactions.remove(txid)!=null;
246         }
247         if(inflight){
248             synchronized(preparedTransactions){
249                 preparedTransactions.remove(txid);
250             }
251         }
252     }
253     
254     public void start() throws Exception JavaDoc {
255     }
256
257     public void stop() throws Exception JavaDoc {
258     }
259     
260     synchronized public void recover(TransactionRecoveryListener listener) throws IOException JavaDoc{
261         // All the in-flight transactions get rolled back..
262
synchronized(inflightTransactions){
263             inflightTransactions.clear();
264         }
265         this.doingRecover=true;
266         try{
267             Map JavaDoc txs=null;
268             synchronized(preparedTransactions){
269                 txs=new LinkedHashMap JavaDoc(preparedTransactions);
270             }
271             for(Iterator JavaDoc iter=txs.keySet().iterator();iter.hasNext();){
272                 Object JavaDoc txid=(Object JavaDoc)iter.next();
273                 Tx tx=(Tx)txs.get(txid);
274                 listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
275             }
276         }finally{
277             this.doingRecover=false;
278         }
279     }
280
281     /**
282      * @param message
283      * @throws IOException
284      */

285     void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException JavaDoc {
286         Tx tx = getTx(message.getTransactionId(), location);
287         tx.add(store, message);
288     }
289
290     /**
291      * @param ack
292      * @throws IOException
293      */

294     public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) throws IOException JavaDoc {
295         Tx tx = getTx(ack.getTransactionId(), location);
296         tx.add(store, ack);
297     }
298     
299     
300     public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
301         Tx tx = getTx(ack.getTransactionId(), location);
302         tx.add(store, ack);
303     }
304
305
306     public RecordLocation checkpoint() throws IOException JavaDoc{
307         // Nothing really to checkpoint.. since, we don't
308
// checkpoint tx operations in to long term store until they are committed.
309
// But we keep track of the first location of an operation
310
// that was associated with an active tx. The journal can not
311
// roll over active tx records.
312
RecordLocation rc=null;
313         synchronized(inflightTransactions){
314             for(Iterator JavaDoc iter=inflightTransactions.values().iterator();iter.hasNext();){
315                 Tx tx=(Tx)iter.next();
316                 RecordLocation location=tx.location;
317                 if(rc==null||rc.compareTo(location)<0){
318                     rc=location;
319                 }
320             }
321         }
322         synchronized(preparedTransactions){
323             for(Iterator JavaDoc iter=preparedTransactions.values().iterator();iter.hasNext();){
324                 Tx tx=(Tx)iter.next();
325                 RecordLocation location=tx.location;
326                 if(rc==null||rc.compareTo(location)<0){
327                     rc=location;
328                 }
329             }
330             return rc;
331         }
332     }
333
334     public boolean isDoingRecover() {
335         return doingRecover;
336     }
337
338
339 }
340
Popular Tags