1 18 19 package org.apache.activemq.store.amq; 20 21 import java.io.IOException ; 22 import java.util.ArrayList ; 23 import java.util.Iterator ; 24 import java.util.LinkedHashMap ; 25 import java.util.Map ; 26 27 import javax.transaction.xa.XAException ; 28 29 import org.apache.activemq.command.JournalTopicAck; 30 import org.apache.activemq.command.JournalTransaction; 31 import org.apache.activemq.command.Message; 32 import org.apache.activemq.command.MessageAck; 33 import org.apache.activemq.command.TransactionId; 34 import org.apache.activemq.command.XATransactionId; 35 import org.apache.activemq.kaha.impl.async.Location; 36 import org.apache.activemq.store.TransactionRecoveryListener; 37 import org.apache.activemq.store.TransactionStore; 38 39 40 42 public class AMQTransactionStore implements TransactionStore { 43 44 private final AMQPersistenceAdapter peristenceAdapter; 45 Map <TransactionId, Tx> inflightTransactions = new LinkedHashMap <TransactionId, Tx>(); 46 Map <TransactionId, Tx> preparedTransactions = new LinkedHashMap <TransactionId, Tx>(); 47 private boolean doingRecover; 48 49 50 public static class TxOperation { 51 52 static final byte ADD_OPERATION_TYPE = 0; 53 static final byte REMOVE_OPERATION_TYPE = 1; 54 static final byte ACK_OPERATION_TYPE = 3; 55 56 public byte operationType; 57 public AMQMessageStore store; 58 public Object data; 59 public Location location; 60 61 public TxOperation(byte operationType, AMQMessageStore store, Object data, Location location) { 62 this.operationType=operationType; 63 this.store=store; 64 this.data=data; 65 this.location=location; 66 } 67 68 } 69 73 public static class Tx { 74 75 private final Location location; 76 private ArrayList <TxOperation> operations = new ArrayList <TxOperation>(); 77 78 public Tx(Location location) { 79 this.location=location; 80 } 81 82 public void add(AMQMessageStore store, Message msg, Location location) { 83 operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location)); 84 } 85 86 public void add(AMQMessageStore store, MessageAck ack) { 87 operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null)); 88 } 89 90 public void add(AMQTopicMessageStore store, JournalTopicAck ack) { 91 operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null)); 92 } 93 94 public Message[] getMessages() { 95 ArrayList <Object > list = new ArrayList <Object >(); 96 for (Iterator <TxOperation> iter = operations.iterator(); iter.hasNext();) { 97 TxOperation op = iter.next(); 98 if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) { 99 list.add(op.data); 100 } 101 } 102 Message rc[] = new Message[list.size()]; 103 list.toArray(rc); 104 return rc; 105 } 106 107 public MessageAck[] getAcks() { 108 ArrayList <Object > list = new ArrayList <Object >(); 109 for (Iterator <TxOperation> iter = operations.iterator(); iter.hasNext();) { 110 TxOperation op = iter.next(); 111 if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) { 112 list.add(op.data); 113 } 114 } 115 MessageAck rc[] = new MessageAck[list.size()]; 116 list.toArray(rc); 117 return rc; 118 } 119 120 public ArrayList <TxOperation> getOperations() { 121 return operations; 122 } 123 124 } 125 126 public AMQTransactionStore(AMQPersistenceAdapter adapter) { 127 this.peristenceAdapter = adapter; 128 } 129 130 134 public void prepare(TransactionId txid) throws IOException { 135 Tx tx=null; 136 synchronized(inflightTransactions){ 137 tx=inflightTransactions.remove(txid); 138 } 139 if(tx==null) 140 return; 141 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true); 142 synchronized(preparedTransactions){ 143 preparedTransactions.put(txid,tx); 144 } 145 } 146 147 151 public void replayPrepare(TransactionId txid) throws IOException { 152 Tx tx=null; 153 synchronized(inflightTransactions){ 154 tx=inflightTransactions.remove(txid); 155 } 156 if(tx==null) 157 return; 158 synchronized(preparedTransactions){ 159 preparedTransactions.put(txid,tx); 160 } 161 } 162 163 public Tx getTx(TransactionId txid,Location location){ 164 Tx tx=null; 165 synchronized(inflightTransactions){ 166 tx=inflightTransactions.get(txid); 167 } 168 if(tx==null){ 169 tx=new Tx(location); 170 inflightTransactions.put(txid,tx); 171 } 172 return tx; 173 } 174 175 179 public void commit(TransactionId txid,boolean wasPrepared) throws IOException { 180 Tx tx; 181 if(wasPrepared){ 182 synchronized(preparedTransactions){ 183 tx=preparedTransactions.remove(txid); 184 } 185 }else{ 186 synchronized(inflightTransactions){ 187 tx=inflightTransactions.remove(txid); 188 } 189 } 190 if(tx==null) 191 return; 192 if(txid.isXATransaction()){ 193 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true); 194 }else{ 195 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared), 196 true); 197 } 198 } 199 200 204 public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException { 205 if(wasPrepared){ 206 synchronized(preparedTransactions){ 207 return preparedTransactions.remove(txid); 208 } 209 }else{ 210 synchronized(inflightTransactions){ 211 return inflightTransactions.remove(txid); 212 } 213 } 214 } 215 216 220 public void rollback(TransactionId txid) throws IOException { 221 Tx tx=null; 222 synchronized(inflightTransactions){ 223 tx=inflightTransactions.remove(txid); 224 } 225 if(tx!=null) 226 synchronized(preparedTransactions){ 227 tx=preparedTransactions.remove(txid); 228 } 229 if(tx!=null){ 230 if(txid.isXATransaction()){ 231 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true); 232 }else{ 233 peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false), 234 true); 235 } 236 } 237 } 238 239 243 public void replayRollback(TransactionId txid) throws IOException { 244 boolean inflight=false; 245 synchronized(inflightTransactions){ 246 inflight=inflightTransactions.remove(txid)!=null; 247 } 248 if(inflight){ 249 synchronized(preparedTransactions){ 250 preparedTransactions.remove(txid); 251 } 252 } 253 } 254 255 public void start() throws Exception { 256 } 257 258 public void stop() throws Exception { 259 } 260 261 synchronized public void recover(TransactionRecoveryListener listener) throws IOException { 262 synchronized(inflightTransactions){ 264 inflightTransactions.clear(); 265 } 266 this.doingRecover=true; 267 try{ 268 Map <TransactionId, Tx> txs=null; 269 synchronized(preparedTransactions){ 270 txs=new LinkedHashMap <TransactionId, Tx>(preparedTransactions); 271 } 272 for(Iterator <TransactionId> iter=txs.keySet().iterator();iter.hasNext();){ 273 Object txid=iter.next(); 274 Tx tx=txs.get(txid); 275 listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks()); 276 } 277 }finally{ 278 this.doingRecover=false; 279 } 280 } 281 282 286 void addMessage(AMQMessageStore store, Message message, Location location) throws IOException { 287 Tx tx = getTx(message.getTransactionId(), location); 288 tx.add(store, message, location); 289 } 290 291 295 public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException { 296 Tx tx = getTx(ack.getTransactionId(), location); 297 tx.add(store, ack); 298 } 299 300 301 public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) { 302 Tx tx = getTx(ack.getTransactionId(), location); 303 tx.add(store, ack); 304 } 305 306 307 public Location checkpoint() throws IOException { 308 Location rc=null; 314 synchronized(inflightTransactions){ 315 for(Iterator <Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){ 316 Tx tx=iter.next(); 317 Location location=tx.location; 318 if(rc==null||rc.compareTo(location)<0){ 319 rc=location; 320 } 321 } 322 } 323 synchronized(preparedTransactions){ 324 for(Iterator <Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){ 325 Tx tx=iter.next(); 326 Location location=tx.location; 327 if(rc==null||rc.compareTo(location)<0){ 328 rc=location; 329 } 330 } 331 return rc; 332 } 333 } 334 335 public boolean isDoingRecover() { 336 return doingRecover; 337 } 338 339 340 } 341 | Popular Tags |