KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > kahadaptor > KahaTransactionStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.kahadaptor;
19
20 import java.io.IOException JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.Map JavaDoc;
23 import java.util.Map.Entry;
24 import javax.transaction.xa.XAException JavaDoc;
25 import org.apache.activemq.broker.ConnectionContext;
26 import org.apache.activemq.command.Message;
27 import org.apache.activemq.command.MessageAck;
28 import org.apache.activemq.command.TransactionId;
29 import org.apache.activemq.command.XATransactionId;
30 import org.apache.activemq.store.MessageStore;
31 import org.apache.activemq.store.ProxyMessageStore;
32 import org.apache.activemq.store.ProxyTopicMessageStore;
33 import org.apache.activemq.store.TopicMessageStore;
34 import org.apache.activemq.store.TransactionRecoveryListener;
35 import org.apache.activemq.store.TransactionStore;
36 import java.util.concurrent.ConcurrentHashMap JavaDoc;
37 /**
38  * Provides a TransactionStore implementation that can create transaction aware MessageStore objects from non
39  * transaction aware MessageStore objects.
40  *
41  * @version $Revision: 1.4 $
42  */

43 public class KahaTransactionStore implements TransactionStore{
44     private Map JavaDoc transactions=new ConcurrentHashMap JavaDoc();
45     private Map JavaDoc prepared;
46     private KahaPersistenceAdapter adaptor;
47
48     KahaTransactionStore(KahaPersistenceAdapter adaptor,Map JavaDoc preparedMap){
49         this.adaptor=adaptor;
50         this.prepared=preparedMap;
51     }
52
53     public MessageStore proxy(MessageStore messageStore){
54         return new ProxyMessageStore(messageStore){
55             public void addMessage(ConnectionContext context,final Message send) throws IOException JavaDoc{
56                 KahaTransactionStore.this.addMessage(getDelegate(),send);
57             }
58
59             public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException JavaDoc{
60                 KahaTransactionStore.this.removeMessage(getDelegate(),ack);
61             }
62         };
63     }
64
65     public TopicMessageStore proxy(TopicMessageStore messageStore){
66         return new ProxyTopicMessageStore(messageStore){
67             public void addMessage(ConnectionContext context,final Message send) throws IOException JavaDoc{
68                 KahaTransactionStore.this.addMessage(getDelegate(),send);
69             }
70
71             public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException JavaDoc{
72                 KahaTransactionStore.this.removeMessage(getDelegate(),ack);
73             }
74         };
75     }
76
77     /**
78      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
79      */

80     public void prepare(TransactionId txid){
81         KahaTransaction tx=getTx(txid);
82         if(tx!=null){
83             tx.prepare();
84             prepared.put(txid,tx);
85         }
86     }
87
88     /**
89      * @throws XAException
90      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
91      */

92     public void commit(TransactionId txid,boolean wasPrepared) throws IOException JavaDoc{
93         KahaTransaction tx=getTx(txid);
94         if(tx!=null){
95             tx.commit(this);
96             removeTx(txid);
97         }
98     }
99
100     /**
101      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
102      */

103     public void rollback(TransactionId txid){
104         KahaTransaction tx=getTx(txid);
105         if(tx!=null){
106             tx.rollback();
107             removeTx(txid);
108         }
109     }
110
111     public void start() throws Exception JavaDoc{}
112
113     public void stop() throws Exception JavaDoc{}
114
115     synchronized public void recover(TransactionRecoveryListener listener) throws IOException JavaDoc{
116         for(Iterator JavaDoc i=prepared.entrySet().iterator();i.hasNext();){
117             Map.Entry JavaDoc entry=(Entry) i.next();
118             XATransactionId xid=(XATransactionId) entry.getKey();
119             KahaTransaction kt=(KahaTransaction) entry.getValue();
120             listener.recover(xid,kt.getMessages(),kt.getAcks());
121         }
122     }
123
124     /**
125      * @param message
126      * @throws IOException
127      */

128     void addMessage(final MessageStore destination,final Message message) throws IOException JavaDoc{
129         if(message.isInTransaction()){
130             KahaTransaction tx=getOrCreateTx(message.getTransactionId());
131             tx.add((KahaMessageStore) destination,message);
132         }else{
133             destination.addMessage(null,message);
134         }
135     }
136
137     /**
138      * @param ack
139      * @throws IOException
140      */

141     private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException JavaDoc{
142         if(ack.isInTransaction()){
143             KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
144             tx.add((KahaMessageStore) destination,ack);
145         }else{
146             destination.removeMessage(null,ack);
147         }
148     }
149
150     protected synchronized KahaTransaction getTx(TransactionId key){
151         KahaTransaction result=(KahaTransaction) transactions.get(key);
152         if(result==null){
153             result=(KahaTransaction) prepared.get(key);
154         }
155         return result;
156     }
157
158     protected synchronized KahaTransaction getOrCreateTx(TransactionId key){
159         KahaTransaction result=(KahaTransaction) transactions.get(key);
160         if(result==null){
161             result=new KahaTransaction();
162             transactions.put(key,result);
163         }
164         return result;
165     }
166
167     protected synchronized void removeTx(TransactionId key){
168         transactions.remove(key);
169         prepared.remove(key);
170     }
171
172     public void delete(){
173         transactions.clear();
174         prepared.clear();
175     }
176
177     protected MessageStore getStoreById(Object JavaDoc id){
178         return adaptor.retrieveMessageStore(id);
179     }
180 }
181
Popular Tags