KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > kahadaptor > KahaPersistenceAdapter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.kahadaptor;
16
17 import java.io.File JavaDoc;
18 import java.io.IOException JavaDoc;
19 import java.util.HashSet JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.Set JavaDoc;
22 import java.util.concurrent.ConcurrentHashMap JavaDoc;
23 import org.apache.activemq.broker.ConnectionContext;
24 import org.apache.activemq.command.ActiveMQDestination;
25 import org.apache.activemq.command.ActiveMQQueue;
26 import org.apache.activemq.command.ActiveMQTopic;
27 import org.apache.activemq.command.Message;
28 import org.apache.activemq.command.MessageId;
29 import org.apache.activemq.kaha.CommandMarshaller;
30 import org.apache.activemq.kaha.ListContainer;
31 import org.apache.activemq.kaha.MapContainer;
32 import org.apache.activemq.kaha.Marshaller;
33 import org.apache.activemq.kaha.MessageIdMarshaller;
34 import org.apache.activemq.kaha.MessageMarshaller;
35 import org.apache.activemq.kaha.Store;
36 import org.apache.activemq.kaha.StoreFactory;
37 import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
38 import org.apache.activemq.memory.UsageManager;
39 import org.apache.activemq.openwire.OpenWireFormat;
40 import org.apache.activemq.store.MessageStore;
41 import org.apache.activemq.store.PersistenceAdapter;
42 import org.apache.activemq.store.TopicMessageStore;
43 import org.apache.activemq.store.TransactionStore;
44 import org.apache.activemq.util.IOHelper;
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47
48 /**
49  * @org.apache.xbean.XBean
50  *
51  * @version $Revision: 1.4 $
52  */

53 public class KahaPersistenceAdapter implements PersistenceAdapter{
54
55     private static final int STORE_LOCKED_WAIT_DELAY=10*1000;
56     private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
57     static final String JavaDoc PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
58     KahaTransactionStore transactionStore;
59     ConcurrentHashMap JavaDoc<ActiveMQTopic,TopicMessageStore> topics=new ConcurrentHashMap JavaDoc<ActiveMQTopic,TopicMessageStore>();
60     ConcurrentHashMap JavaDoc<ActiveMQQueue,MessageStore> queues=new ConcurrentHashMap JavaDoc<ActiveMQQueue,MessageStore>();
61     ConcurrentHashMap JavaDoc<ActiveMQDestination,MessageStore> messageStores=new ConcurrentHashMap JavaDoc<ActiveMQDestination,MessageStore>();
62     protected OpenWireFormat wireFormat=new OpenWireFormat();
63     private long maxDataFileLength=32*1024*1024;
64     private File JavaDoc directory;
65     private String JavaDoc brokerName;
66     private Store theStore;
67     private boolean initialized;
68
69     public Set JavaDoc<ActiveMQDestination> getDestinations(){
70         Set JavaDoc<ActiveMQDestination> rc=new HashSet JavaDoc<ActiveMQDestination>();
71         try{
72             Store store=getStore();
73             for(Iterator JavaDoc i=store.getMapContainerIds().iterator();i.hasNext();){
74                 Object JavaDoc obj=i.next();
75                 if(obj instanceof ActiveMQDestination){
76                     rc.add((ActiveMQDestination)obj);
77                 }
78             }
79         }catch(IOException JavaDoc e){
80             log.error("Failed to get destinations ",e);
81         }
82         return rc;
83     }
84
85     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException JavaDoc{
86         MessageStore rc=queues.get(destination);
87         if(rc==null){
88             rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
89             messageStores.put(destination,rc);
90             if(transactionStore!=null){
91                 rc=transactionStore.proxy(rc);
92             }
93             queues.put(destination,rc);
94         }
95         return rc;
96     }
97
98     public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException JavaDoc{
99         TopicMessageStore rc=topics.get(destination);
100         if(rc==null){
101             Store store=getStore();
102             MapContainer messageContainer=getMapContainer(destination,"topic-data");
103             MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs");
104             ListContainer<TopicSubAck> ackContainer=store.getListContainer(destination.toString(),"topic-acks");
105             ackContainer.setMarshaller(new TopicSubAckMarshaller());
106             rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
107             messageStores.put(destination,rc);
108             if(transactionStore!=null){
109                 rc=transactionStore.proxy(rc);
110             }
111             topics.put(destination,rc);
112         }
113         return rc;
114     }
115
116     protected MessageStore retrieveMessageStore(Object JavaDoc id){
117         MessageStore result=messageStores.get(id);
118         return result;
119     }
120
121     public TransactionStore createTransactionStore() throws IOException JavaDoc{
122         if(transactionStore==null){
123             while(true){
124                 try{
125                     Store store=getStore();
126                     MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
127                     container.setKeyMarshaller(new CommandMarshaller(wireFormat));
128                     container.setValueMarshaller(new TransactionMarshaller(wireFormat));
129                     container.load();
130                     transactionStore=new KahaTransactionStore(this,container);
131                     break;
132                 }catch(StoreLockedExcpetion e){
133                     log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)
134                             +" seconds for the Store to be unlocked.");
135                     try{
136                         Thread.sleep(STORE_LOCKED_WAIT_DELAY);
137                     }catch(InterruptedException JavaDoc e1){
138                     }
139                 }
140             }
141         }
142         return transactionStore;
143     }
144
145     public void beginTransaction(ConnectionContext context){
146     }
147
148     public void commitTransaction(ConnectionContext context) throws IOException JavaDoc{
149         if(theStore!=null){
150             theStore.force();
151         }
152     }
153
154     public void rollbackTransaction(ConnectionContext context){
155     }
156
157     public void start() throws Exception JavaDoc{
158         initialize();
159     }
160
161     public void stop() throws Exception JavaDoc{
162         if(theStore!=null){
163             theStore.close();
164         }
165     }
166
167     public long getLastMessageBrokerSequenceId() throws IOException JavaDoc{
168         return 0;
169     }
170
171     public void deleteAllMessages() throws IOException JavaDoc{
172         if(theStore!=null){
173             if(theStore.isInitialized()){
174                 theStore.clear();
175             }else{
176                 theStore.delete();
177             }
178         }else{
179             StoreFactory.delete(getStoreName());
180         }
181     }
182
183     protected MapContainer<MessageId,Message> getMapContainer(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
184         Store store=getStore();
185         MapContainer<MessageId,Message> container=store.getMapContainer(id,containerName);
186         container.setKeyMarshaller(new MessageIdMarshaller());
187         container.setValueMarshaller(new MessageMarshaller(wireFormat));
188         container.load();
189         return container;
190     }
191
192     protected MapContainer<String JavaDoc,Object JavaDoc> getSubsMapContainer(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
193         Store store=getStore();
194         MapContainer<String JavaDoc,Object JavaDoc> container=store.getMapContainer(id,containerName);
195         container.setKeyMarshaller(Store.StringMarshaller);
196         container.setValueMarshaller(createMessageMarshaller());
197         container.load();
198         return container;
199     }
200
201     protected Marshaller<Object JavaDoc> createMessageMarshaller(){
202         return new CommandMarshaller(wireFormat);
203     }
204
205     protected ListContainer getListContainer(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
206         Store store=getStore();
207         ListContainer container=store.getListContainer(id,containerName);
208         container.setMarshaller(createMessageMarshaller());
209         container.load();
210         return container;
211     }
212
213     /**
214      * @param usageManager The UsageManager that is controlling the broker's memory usage.
215      */

216     public void setUsageManager(UsageManager usageManager){
217     }
218
219     /**
220      * @return the maxDataFileLength
221      */

222     public long getMaxDataFileLength(){
223         return maxDataFileLength;
224     }
225
226     /**
227      * @param maxDataFileLength the maxDataFileLength to set
228      *
229      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
230      */

231     public void setMaxDataFileLength(long maxDataFileLength){
232         this.maxDataFileLength=maxDataFileLength;
233     }
234
235     protected synchronized Store getStore() throws IOException JavaDoc{
236         if(theStore==null){
237             theStore=StoreFactory.open(getStoreName(),"rw");
238             theStore.setMaxDataFileLength(maxDataFileLength);
239         }
240         return theStore;
241     }
242
243     private String JavaDoc getStoreName(){
244         initialize();
245         return directory.getAbsolutePath();
246     }
247
248     public String JavaDoc toString(){
249         return "KahaPersistenceAdapter("+getStoreName()+")";
250     }
251
252     public void setBrokerName(String JavaDoc brokerName){
253         this.brokerName=brokerName;
254     }
255     
256     public String JavaDoc getBrokerName(){
257         return brokerName;
258     }
259
260     public File JavaDoc getDirectory(){
261         return this.directory;
262     }
263
264     public void setDirectory(File JavaDoc directory){
265         this.directory=directory;
266     }
267   
268     public void checkpoint(boolean sync) throws IOException JavaDoc{
269         if(sync){
270             getStore().force();
271         }
272     }
273
274     private void initialize(){
275         if(!initialized){
276             initialized=true;
277             if(this.directory==null){
278                 this.directory=new File JavaDoc(IOHelper.getDefaultDataDirectory());
279                 this.directory=new File JavaDoc(this.directory,brokerName+"-kahastore");
280             }
281             this.directory.mkdirs();
282             wireFormat.setCacheEnabled(false);
283             wireFormat.setTightEncodingEnabled(true);
284         }
285     }
286
287    
288 }
289
Popular Tags