1 18 package org.apache.activemq.store.memory; 19 20 import java.io.IOException ; 21 import java.util.ArrayList ; 22 import java.util.Iterator ; 23 24 import javax.transaction.xa.XAException ; 25 26 import org.apache.activemq.broker.ConnectionContext; 27 import org.apache.activemq.command.Message; 28 import org.apache.activemq.command.MessageAck; 29 import org.apache.activemq.command.TransactionId; 30 import org.apache.activemq.command.XATransactionId; 31 import org.apache.activemq.store.MessageStore; 32 import org.apache.activemq.store.ProxyMessageStore; 33 import org.apache.activemq.store.ProxyTopicMessageStore; 34 import org.apache.activemq.store.TopicMessageStore; 35 import org.apache.activemq.store.TransactionRecoveryListener; 36 import org.apache.activemq.store.TransactionStore; 37 38 import java.util.concurrent.ConcurrentHashMap ; 39 40 46 public class MemoryTransactionStore implements TransactionStore { 47 48 ConcurrentHashMap inflightTransactions = new ConcurrentHashMap (); 49 50 ConcurrentHashMap preparedTransactions = new ConcurrentHashMap (); 51 52 private boolean doingRecover; 53 54 public static class Tx { 55 private ArrayList messages = new ArrayList (); 56 57 private ArrayList acks = new ArrayList (); 58 59 public void add(AddMessageCommand msg) { 60 messages.add(msg); 61 } 62 63 public void add(RemoveMessageCommand ack) { 64 acks.add(ack); 65 } 66 67 public Message[] getMessages() { 68 Message rc[] = new Message[messages.size()]; 69 int count=0; 70 for (Iterator iter = messages.iterator(); iter.hasNext();) { 71 AddMessageCommand cmd = (AddMessageCommand) iter.next(); 72 rc[count++] = cmd.getMessage(); 73 } 74 return rc; 75 } 76 77 public MessageAck[] getAcks() { 78 MessageAck rc[] = new MessageAck[acks.size()]; 79 int count=0; 80 for (Iterator iter = acks.iterator(); iter.hasNext();) { 81 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next(); 82 rc[count++] = cmd.getMessageAck(); 83 } 84 return rc; 85 } 86 87 90 public void commit() throws IOException { 91 for (Iterator iter = messages.iterator(); iter.hasNext();) { 93 AddMessageCommand cmd = (AddMessageCommand) iter.next(); 94 cmd.run(); 95 } 96 for (Iterator iter = acks.iterator(); iter.hasNext();) { 98 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next(); 99 cmd.run(); 100 } 101 } 102 } 103 104 public interface AddMessageCommand { 105 Message getMessage(); 106 void run() throws IOException ; 107 } 108 109 public interface RemoveMessageCommand { 110 MessageAck getMessageAck(); 111 void run() throws IOException ; 112 } 113 114 public MessageStore proxy(MessageStore messageStore) { 115 return new ProxyMessageStore(messageStore) { 116 public void addMessage(ConnectionContext context, final Message send) throws IOException { 117 MemoryTransactionStore.this.addMessage(getDelegate(), send); 118 } 119 120 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 121 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 122 } 123 }; 124 } 125 126 public TopicMessageStore proxy(TopicMessageStore messageStore) { 127 return new ProxyTopicMessageStore(messageStore) { 128 public void addMessage(ConnectionContext context, final Message send) throws IOException { 129 MemoryTransactionStore.this.addMessage(getDelegate(), send); 130 } 131 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 132 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 133 } 134 }; 135 } 136 137 140 public void prepare(TransactionId txid) { 141 Tx tx = (Tx) inflightTransactions.remove(txid); 142 if (tx == null) 143 return; 144 preparedTransactions.put(txid, tx); 145 } 146 147 public Tx getTx(Object txid) { 148 Tx tx = (Tx) inflightTransactions.get(txid); 149 if (tx == null) { 150 tx = new Tx(); 151 inflightTransactions.put(txid, tx); 152 } 153 return tx; 154 } 155 156 160 public void commit(TransactionId txid, boolean wasPrepared) throws IOException { 161 162 Tx tx; 163 if( wasPrepared ) { 164 tx = (Tx) preparedTransactions.remove(txid); 165 } else { 166 tx = (Tx) inflightTransactions.remove(txid); 167 } 168 169 if( tx == null ) 170 return; 171 tx.commit(); 172 173 } 174 175 178 public void rollback(TransactionId txid) { 179 preparedTransactions.remove(txid); 180 inflightTransactions.remove(txid); 181 } 182 183 public void start() throws Exception { 184 } 185 186 public void stop() throws Exception { 187 } 188 189 synchronized public void recover(TransactionRecoveryListener listener) throws IOException { 190 inflightTransactions.clear(); 192 this.doingRecover = true; 193 try { 194 for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 195 Object txid = (Object ) iter.next(); 196 Tx tx = (Tx) preparedTransactions.get(txid); 197 listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks()); 198 } 199 } finally { 200 this.doingRecover = false; 201 } 202 } 203 204 208 void addMessage(final MessageStore destination, final Message message) throws IOException { 209 210 if( doingRecover ) 211 return; 212 213 if (message.getTransactionId()!=null) { 214 Tx tx = getTx(message.getTransactionId()); 215 tx.add(new AddMessageCommand() { 216 public Message getMessage() { 217 return message; 218 } 219 public void run() throws IOException { 220 destination.addMessage(null, message); 221 } 222 }); 223 } else { 224 destination.addMessage(null, message); 225 } 226 } 227 228 232 private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException { 233 if( doingRecover ) 234 return; 235 236 if (ack.isInTransaction()) { 237 Tx tx = getTx(ack.getTransactionId()); 238 tx.add(new RemoveMessageCommand() { 239 public MessageAck getMessageAck() { 240 return ack; 241 } 242 public void run() throws IOException { 243 destination.removeMessage(null, ack); 244 } 245 }); 246 } else { 247 destination.removeMessage(null, ack); 248 } 249 } 250 251 public void delete() { 252 inflightTransactions.clear(); 253 preparedTransactions.clear(); 254 doingRecover=false; 255 } 256 257 } 258 | Popular Tags |