KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > memory > MemoryTransactionStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.memory;
19
20 import java.io.IOException JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.Iterator JavaDoc;
23
24 import javax.transaction.xa.XAException JavaDoc;
25
26 import org.apache.activemq.broker.ConnectionContext;
27 import org.apache.activemq.command.Message;
28 import org.apache.activemq.command.MessageAck;
29 import org.apache.activemq.command.TransactionId;
30 import org.apache.activemq.command.XATransactionId;
31 import org.apache.activemq.store.MessageStore;
32 import org.apache.activemq.store.ProxyMessageStore;
33 import org.apache.activemq.store.ProxyTopicMessageStore;
34 import org.apache.activemq.store.TopicMessageStore;
35 import org.apache.activemq.store.TransactionRecoveryListener;
36 import org.apache.activemq.store.TransactionStore;
37
38 import java.util.concurrent.ConcurrentHashMap JavaDoc;
39
40 /**
41  * Provides a TransactionStore implementation that can create transaction aware
42  * MessageStore objects from non transaction aware MessageStore objects.
43  *
44  * @version $Revision: 1.4 $
45  */

46 public class MemoryTransactionStore implements TransactionStore {
47
48     ConcurrentHashMap JavaDoc inflightTransactions = new ConcurrentHashMap JavaDoc();
49
50     ConcurrentHashMap JavaDoc preparedTransactions = new ConcurrentHashMap JavaDoc();
51
52     private boolean doingRecover;
53
54     public static class Tx {
55         private ArrayList JavaDoc messages = new ArrayList JavaDoc();
56
57         private ArrayList JavaDoc acks = new ArrayList JavaDoc();
58
59         public void add(AddMessageCommand msg) {
60             messages.add(msg);
61         }
62
63         public void add(RemoveMessageCommand ack) {
64             acks.add(ack);
65         }
66
67         public Message[] getMessages() {
68             Message rc[] = new Message[messages.size()];
69             int count=0;
70             for (Iterator JavaDoc iter = messages.iterator(); iter.hasNext();) {
71                 AddMessageCommand cmd = (AddMessageCommand) iter.next();
72                 rc[count++] = cmd.getMessage();
73             }
74             return rc;
75         }
76
77         public MessageAck[] getAcks() {
78             MessageAck rc[] = new MessageAck[acks.size()];
79             int count=0;
80             for (Iterator JavaDoc iter = acks.iterator(); iter.hasNext();) {
81                 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
82                 rc[count++] = cmd.getMessageAck();
83             }
84             return rc;
85         }
86
87         /**
88          * @throws IOException
89          */

90         public void commit() throws IOException JavaDoc {
91             // Do all the message adds.
92
for (Iterator JavaDoc iter = messages.iterator(); iter.hasNext();) {
93                 AddMessageCommand cmd = (AddMessageCommand) iter.next();
94                 cmd.run();
95             }
96             // And removes..
97
for (Iterator JavaDoc iter = acks.iterator(); iter.hasNext();) {
98                 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
99                 cmd.run();
100             }
101         }
102     }
103
104     public interface AddMessageCommand {
105         Message getMessage();
106         void run() throws IOException JavaDoc;
107     }
108
109     public interface RemoveMessageCommand {
110         MessageAck getMessageAck();
111         void run() throws IOException JavaDoc;
112     }
113
114     public MessageStore proxy(MessageStore messageStore) {
115         return new ProxyMessageStore(messageStore) {
116             public void addMessage(ConnectionContext context, final Message send) throws IOException JavaDoc {
117                 MemoryTransactionStore.this.addMessage(getDelegate(), send);
118             }
119     
120             public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException JavaDoc {
121                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
122             }
123         };
124     }
125
126     public TopicMessageStore proxy(TopicMessageStore messageStore) {
127         return new ProxyTopicMessageStore(messageStore) {
128             public void addMessage(ConnectionContext context, final Message send) throws IOException JavaDoc {
129                 MemoryTransactionStore.this.addMessage(getDelegate(), send);
130             }
131             public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException JavaDoc {
132                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
133             }
134         };
135     }
136
137     /**
138      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
139      */

140     public void prepare(TransactionId txid) {
141         Tx tx = (Tx) inflightTransactions.remove(txid);
142         if (tx == null)
143             return;
144         preparedTransactions.put(txid, tx);
145     }
146
147     public Tx getTx(Object JavaDoc txid) {
148         Tx tx = (Tx) inflightTransactions.get(txid);
149         if (tx == null) {
150             tx = new Tx();
151             inflightTransactions.put(txid, tx);
152         }
153         return tx;
154     }
155
156     /**
157      * @throws XAException
158      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
159      */

160     public void commit(TransactionId txid, boolean wasPrepared) throws IOException JavaDoc {
161         
162         Tx tx;
163         if( wasPrepared ) {
164             tx = (Tx) preparedTransactions.remove(txid);
165         } else {
166             tx = (Tx) inflightTransactions.remove(txid);
167         }
168         
169         if( tx == null )
170             return;
171         tx.commit();
172         
173     }
174
175     /**
176      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
177      */

178     public void rollback(TransactionId txid) {
179         preparedTransactions.remove(txid);
180         inflightTransactions.remove(txid);
181     }
182
183     public void start() throws Exception JavaDoc {
184     }
185
186     public void stop() throws Exception JavaDoc {
187     }
188
189     synchronized public void recover(TransactionRecoveryListener listener) throws IOException JavaDoc {
190         // All the inflight transactions get rolled back..
191
inflightTransactions.clear();
192         this.doingRecover = true;
193         try {
194             for (Iterator JavaDoc iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
195                 Object JavaDoc txid = (Object JavaDoc) iter.next();
196                 Tx tx = (Tx) preparedTransactions.get(txid);
197                 listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
198             }
199         } finally {
200             this.doingRecover = false;
201         }
202     }
203
204     /**
205      * @param message
206      * @throws IOException
207      */

208     void addMessage(final MessageStore destination, final Message message) throws IOException JavaDoc {
209         
210         if( doingRecover )
211             return;
212         
213         if (message.getTransactionId()!=null) {
214             Tx tx = getTx(message.getTransactionId());
215             tx.add(new AddMessageCommand() {
216                 public Message getMessage() {
217                     return message;
218                 }
219                 public void run() throws IOException JavaDoc {
220                     destination.addMessage(null, message);
221                 }
222             });
223         } else {
224             destination.addMessage(null, message);
225         }
226     }
227     
228     /**
229      * @param ack
230      * @throws IOException
231      */

232     private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException JavaDoc {
233         if( doingRecover )
234             return;
235
236         if (ack.isInTransaction()) {
237             Tx tx = getTx(ack.getTransactionId());
238             tx.add(new RemoveMessageCommand() {
239                 public MessageAck getMessageAck() {
240                     return ack;
241                 }
242                 public void run() throws IOException JavaDoc {
243                     destination.removeMessage(null, ack);
244                 }
245             });
246         } else {
247             destination.removeMessage(null, ack);
248         }
249     }
250
251     public void delete() {
252         inflightTransactions.clear();
253         preparedTransactions.clear();
254         doingRecover=false;
255     }
256     
257 }
258
Popular Tags