1 18 19 package org.apache.activemq.store.journal; 20 21 import java.io.IOException ; 22 import java.util.ArrayList ; 23 import java.util.Iterator ; 24 import java.util.LinkedHashMap ; 25 import java.util.List ; 26 import java.util.Map ; 27 28 import javax.transaction.xa.XAException ; 29 30 import org.apache.activeio.journal.RecordLocation; 31 import org.apache.activemq.command.JournalTopicAck; 32 import org.apache.activemq.command.JournalTransaction; 33 import org.apache.activemq.command.Message; 34 import org.apache.activemq.command.MessageAck; 35 import org.apache.activemq.command.TransactionId; 36 import org.apache.activemq.command.XATransactionId; 37 import org.apache.activemq.store.TransactionRecoveryListener; 38 import org.apache.activemq.store.TransactionStore; 39 40 41 43 public class JournalTransactionStore implements TransactionStore { 44 45 private final JournalPersistenceAdapter peristenceAdapter; 46 Map inflightTransactions = new LinkedHashMap (); 47 Map preparedTransactions = new LinkedHashMap (); 48 private boolean doingRecover; 49 50 51 public static class TxOperation { 52 53 static final byte ADD_OPERATION_TYPE = 0; 54 static final byte REMOVE_OPERATION_TYPE = 1; 55 static final byte ACK_OPERATION_TYPE = 3; 56 57 public byte operationType; 58 public JournalMessageStore store; 59 public Object data; 60 61 public TxOperation(byte operationType, JournalMessageStore store, Object data) { 62 this.operationType=operationType; 63 this.store=store; 64 this.data=data; 65 } 66 67 } 68 72 public static class Tx { 73 74 private final RecordLocation location; 75 private ArrayList operations = new ArrayList (); 76 77 public Tx(RecordLocation location) { 78 this.location=location; 79 } 80 81 public void add(JournalMessageStore store, Message msg) { 82 operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg)); 83 } 84 85 public void add(JournalMessageStore store, MessageAck ack) { 86 operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack)); 87 } 88 89 public void add(JournalTopicMessageStore store, JournalTopicAck ack) { 90 operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack)); 91 } 92 93 public Message[] getMessages() { 94 ArrayList list = new ArrayList (); 95 for (Iterator iter = operations.iterator(); iter.hasNext();) { 96 TxOperation op = (TxOperation) iter.next(); 97 if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) { 98 list.add(op.data); 99 } 100 } 101 Message rc[] = new Message[list.size()]; 102 list.toArray(rc); 103 return rc; 104 } 105 106 public MessageAck[] getAcks() { 107 ArrayList list = new ArrayList (); 108 for (Iterator iter = operations.iterator(); iter.hasNext();) { 109 TxOperation op = (TxOperation) iter.next(); 110 if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) { 111 list.add(op.data); 112 } 113 } 114 MessageAck rc[] = new MessageAck[list.size()]; 115 list.toArray(rc); 116 return rc; 117 } 118 119 public ArrayList getOperations() { 120 return operations; 121 } 122 123 } 124 125 public JournalTransactionStore(JournalPersistenceAdapter adapter) { 126 this.peristenceAdapter = adapter; 127 } 128 129 133 public void prepare(TransactionId txid) throws IOException { 134 Tx tx=null; 135 synchronized(inflightTransactions){ 136 tx=(Tx)inflightTransactions.remove(txid); 137 } 138 if(tx==null) 139 return; 140 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true); 141 synchronized(preparedTransactions){ 142 preparedTransactions.put(txid,tx); 143 } 144 } 145 146 150 public void replayPrepare(TransactionId txid) throws IOException { 151 Tx tx=null; 152 synchronized(inflightTransactions){ 153 tx=(Tx)inflightTransactions.remove(txid); 154 } 155 if(tx==null) 156 return; 157 synchronized(preparedTransactions){ 158 preparedTransactions.put(txid,tx); 159 } 160 } 161 162 public Tx getTx(Object txid,RecordLocation location){ 163 Tx tx=null; 164 synchronized(inflightTransactions){ 165 tx=(Tx)inflightTransactions.get(txid); 166 } 167 if(tx==null){ 168 tx=new Tx(location); 169 inflightTransactions.put(txid,tx); 170 } 171 return tx; 172 } 173 174 178 public void commit(TransactionId txid,boolean wasPrepared) throws IOException { 179 Tx tx; 180 if(wasPrepared){ 181 synchronized(preparedTransactions){ 182 tx=(Tx)preparedTransactions.remove(txid); 183 } 184 }else{ 185 synchronized(inflightTransactions){ 186 tx=(Tx)inflightTransactions.remove(txid); 187 } 188 } 189 if(tx==null) 190 return; 191 if(txid.isXATransaction()){ 192 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true); 193 }else{ 194 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared), 195 true); 196 } 197 } 198 199 203 public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException { 204 if(wasPrepared){ 205 synchronized(preparedTransactions){ 206 return (Tx)preparedTransactions.remove(txid); 207 } 208 }else{ 209 synchronized(inflightTransactions){ 210 return (Tx)inflightTransactions.remove(txid); 211 } 212 } 213 } 214 215 219 public void rollback(TransactionId txid) throws IOException { 220 Tx tx=null; 221 synchronized(inflightTransactions){ 222 tx=(Tx)inflightTransactions.remove(txid); 223 } 224 if(tx!=null) 225 synchronized(preparedTransactions){ 226 tx=(Tx)preparedTransactions.remove(txid); 227 } 228 if(tx!=null){ 229 if(txid.isXATransaction()){ 230 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true); 231 }else{ 232 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false), 233 true); 234 } 235 } 236 } 237 238 242 public void replayRollback(TransactionId txid) throws IOException { 243 boolean inflight=false; 244 synchronized(inflightTransactions){ 245 inflight=inflightTransactions.remove(txid)!=null; 246 } 247 if(inflight){ 248 synchronized(preparedTransactions){ 249 preparedTransactions.remove(txid); 250 } 251 } 252 } 253 254 public void start() throws Exception { 255 } 256 257 public void stop() throws Exception { 258 } 259 260 synchronized public void recover(TransactionRecoveryListener listener) throws IOException { 261 synchronized(inflightTransactions){ 263 inflightTransactions.clear(); 264 } 265 this.doingRecover=true; 266 try{ 267 Map txs=null; 268 synchronized(preparedTransactions){ 269 txs=new LinkedHashMap (preparedTransactions); 270 } 271 for(Iterator iter=txs.keySet().iterator();iter.hasNext();){ 272 Object txid=(Object )iter.next(); 273 Tx tx=(Tx)txs.get(txid); 274 listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks()); 275 } 276 }finally{ 277 this.doingRecover=false; 278 } 279 } 280 281 285 void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException { 286 Tx tx = getTx(message.getTransactionId(), location); 287 tx.add(store, message); 288 } 289 290 294 public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) throws IOException { 295 Tx tx = getTx(ack.getTransactionId(), location); 296 tx.add(store, ack); 297 } 298 299 300 public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) { 301 Tx tx = getTx(ack.getTransactionId(), location); 302 tx.add(store, ack); 303 } 304 305 306 public RecordLocation checkpoint() throws IOException { 307 RecordLocation rc=null; 313 synchronized(inflightTransactions){ 314 for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){ 315 Tx tx=(Tx)iter.next(); 316 RecordLocation location=tx.location; 317 if(rc==null||rc.compareTo(location)<0){ 318 rc=location; 319 } 320 } 321 } 322 synchronized(preparedTransactions){ 323 for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){ 324 Tx tx=(Tx)iter.next(); 325 RecordLocation location=tx.location; 326 if(rc==null||rc.compareTo(location)<0){ 327 rc=location; 328 } 329 } 330 return rc; 331 } 332 } 333 334 public boolean isDoingRecover() { 335 return doingRecover; 336 } 337 338 339 } 340 | Popular Tags |