1 18 package org.apache.activemq.store.kahadaptor; 19 20 import java.io.IOException ; 21 import java.util.Iterator ; 22 import java.util.Map ; 23 import java.util.Map.Entry; 24 import javax.transaction.xa.XAException ; 25 import org.apache.activemq.broker.ConnectionContext; 26 import org.apache.activemq.command.Message; 27 import org.apache.activemq.command.MessageAck; 28 import org.apache.activemq.command.TransactionId; 29 import org.apache.activemq.command.XATransactionId; 30 import org.apache.activemq.store.MessageStore; 31 import org.apache.activemq.store.ProxyMessageStore; 32 import org.apache.activemq.store.ProxyTopicMessageStore; 33 import org.apache.activemq.store.TopicMessageStore; 34 import org.apache.activemq.store.TransactionRecoveryListener; 35 import org.apache.activemq.store.TransactionStore; 36 import java.util.concurrent.ConcurrentHashMap ; 37 43 public class KahaTransactionStore implements TransactionStore{ 44 private Map transactions=new ConcurrentHashMap (); 45 private Map prepared; 46 private KahaPersistenceAdapter adaptor; 47 48 KahaTransactionStore(KahaPersistenceAdapter adaptor,Map preparedMap){ 49 this.adaptor=adaptor; 50 this.prepared=preparedMap; 51 } 52 53 public MessageStore proxy(MessageStore messageStore){ 54 return new ProxyMessageStore(messageStore){ 55 public void addMessage(ConnectionContext context,final Message send) throws IOException { 56 KahaTransactionStore.this.addMessage(getDelegate(),send); 57 } 58 59 public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException { 60 KahaTransactionStore.this.removeMessage(getDelegate(),ack); 61 } 62 }; 63 } 64 65 public TopicMessageStore proxy(TopicMessageStore messageStore){ 66 return new ProxyTopicMessageStore(messageStore){ 67 public void addMessage(ConnectionContext context,final Message send) throws IOException { 68 KahaTransactionStore.this.addMessage(getDelegate(),send); 69 } 70 71 public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException { 72 KahaTransactionStore.this.removeMessage(getDelegate(),ack); 73 } 74 }; 75 } 76 77 80 public void prepare(TransactionId txid){ 81 KahaTransaction tx=getTx(txid); 82 if(tx!=null){ 83 tx.prepare(); 84 prepared.put(txid,tx); 85 } 86 } 87 88 92 public void commit(TransactionId txid,boolean wasPrepared) throws IOException { 93 KahaTransaction tx=getTx(txid); 94 if(tx!=null){ 95 tx.commit(this); 96 removeTx(txid); 97 } 98 } 99 100 103 public void rollback(TransactionId txid){ 104 KahaTransaction tx=getTx(txid); 105 if(tx!=null){ 106 tx.rollback(); 107 removeTx(txid); 108 } 109 } 110 111 public void start() throws Exception {} 112 113 public void stop() throws Exception {} 114 115 synchronized public void recover(TransactionRecoveryListener listener) throws IOException { 116 for(Iterator i=prepared.entrySet().iterator();i.hasNext();){ 117 Map.Entry entry=(Entry) i.next(); 118 XATransactionId xid=(XATransactionId) entry.getKey(); 119 KahaTransaction kt=(KahaTransaction) entry.getValue(); 120 listener.recover(xid,kt.getMessages(),kt.getAcks()); 121 } 122 } 123 124 128 void addMessage(final MessageStore destination,final Message message) throws IOException { 129 if(message.isInTransaction()){ 130 KahaTransaction tx=getOrCreateTx(message.getTransactionId()); 131 tx.add((KahaMessageStore) destination,message); 132 }else{ 133 destination.addMessage(null,message); 134 } 135 } 136 137 141 private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException { 142 if(ack.isInTransaction()){ 143 KahaTransaction tx=getOrCreateTx(ack.getTransactionId()); 144 tx.add((KahaMessageStore) destination,ack); 145 }else{ 146 destination.removeMessage(null,ack); 147 } 148 } 149 150 protected synchronized KahaTransaction getTx(TransactionId key){ 151 KahaTransaction result=(KahaTransaction) transactions.get(key); 152 if(result==null){ 153 result=(KahaTransaction) prepared.get(key); 154 } 155 return result; 156 } 157 158 protected synchronized KahaTransaction getOrCreateTx(TransactionId key){ 159 KahaTransaction result=(KahaTransaction) transactions.get(key); 160 if(result==null){ 161 result=new KahaTransaction(); 162 transactions.put(key,result); 163 } 164 return result; 165 } 166 167 protected synchronized void removeTx(TransactionId key){ 168 transactions.remove(key); 169 prepared.remove(key); 170 } 171 172 public void delete(){ 173 transactions.clear(); 174 prepared.clear(); 175 } 176 177 protected MessageStore getStoreById(Object id){ 178 return adaptor.retrieveMessageStore(id); 179 } 180 } 181 | Popular Tags |