1 18 package org.apache.activemq.store.kahadaptor; 19 20 import java.io.IOException ; 21 import java.util.ArrayList ; 22 import java.util.List ; 23 import org.apache.activemq.command.BaseCommand; 24 import org.apache.activemq.command.Message; 25 import org.apache.activemq.command.MessageAck; 26 import org.apache.activemq.command.TransactionId; 27 import org.apache.activemq.store.MessageStore; 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 35 class KahaTransaction{ 36 private static final Log log=LogFactory.getLog(KahaTransaction.class); 37 protected List list=new ArrayList (); 38 39 40 void add(KahaMessageStore store,BaseCommand command){ 41 TxCommand tx=new TxCommand(); 42 tx.setCommand(command); 43 tx.setMessageStoreKey(store.getId()); 44 list.add(tx); 45 } 46 47 Message[] getMessages(){ 48 List result=new ArrayList (); 49 for(int i=0;i<list.size();i++){ 50 TxCommand command=(TxCommand) list.get(i); 51 if(command.isAdd()){ 52 result.add(command.getCommand()); 53 } 54 } 55 Message[] messages=new Message[result.size()]; 56 return (Message[]) result.toArray(messages); 57 } 58 59 MessageAck[] getAcks(){ 60 List result=new ArrayList (); 61 for(int i=0;i<list.size();i++){ 62 TxCommand command=(TxCommand) list.get(i); 63 if(command.isRemove()){ 64 result.add(command.getCommand()); 65 } 66 } 67 MessageAck[] acks=new MessageAck[result.size()]; 68 return (MessageAck[]) result.toArray(acks); 69 } 70 71 void prepare(){} 72 73 void rollback(){ 74 list.clear(); 75 } 76 77 80 void commit(KahaTransactionStore transactionStore) throws IOException { 81 for(int i=0;i<list.size();i++){ 82 TxCommand command=(TxCommand) list.get(i); 83 MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey()); 84 if(command.isAdd()){ 85 ms.addMessage(null,(Message) command.getCommand()); 86 } 87 } 88 for(int i=0;i<list.size();i++){ 89 TxCommand command=(TxCommand) list.get(i); 90 MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey()); 91 if(command.isRemove()){ 92 ms.removeMessage(null,(MessageAck) command.getCommand()); 93 } 94 } 95 } 96 97 List getList(){ 98 return new ArrayList (list); 99 } 100 101 void setList(List list){ 102 this.list = list; 103 } 104 } 105 | Popular Tags |