KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > kahadaptor > KahaReferenceStoreAdapter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.kahadaptor;
19
20 import java.io.IOException JavaDoc;
21 import java.util.HashMap JavaDoc;
22 import java.util.Map JavaDoc;
23 import java.util.Set JavaDoc;
24 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
25 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
26 import org.apache.activemq.command.ActiveMQDestination;
27 import org.apache.activemq.command.ActiveMQQueue;
28 import org.apache.activemq.command.ActiveMQTopic;
29 import org.apache.activemq.command.MessageId;
30 import org.apache.activemq.kaha.ListContainer;
31 import org.apache.activemq.kaha.MapContainer;
32 import org.apache.activemq.kaha.MessageIdMarshaller;
33 import org.apache.activemq.kaha.Store;
34 import org.apache.activemq.store.MessageStore;
35 import org.apache.activemq.store.ReferenceStore;
36 import org.apache.activemq.store.ReferenceStoreAdapter;
37 import org.apache.activemq.store.TopicMessageStore;
38 import org.apache.activemq.store.TopicReferenceStore;
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41
42 public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
43     private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
44    private static final String JavaDoc STORE_STATE = "store-state";
45    private static final String JavaDoc RECORD_REFERENCES = "record-references";
46     private MapContainer stateMap;
47     private Map JavaDoc<Integer JavaDoc,AtomicInteger JavaDoc>recordReferences = new HashMap JavaDoc<Integer JavaDoc,AtomicInteger JavaDoc>();
48     private boolean storeValid;
49
50     
51     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException JavaDoc{
52         throw new RuntimeException JavaDoc("Use createQueueReferenceStore instead");
53     }
54
55     public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException JavaDoc{
56         throw new RuntimeException JavaDoc("Use createTopicReferenceStore instead");
57     }
58     
59     @Override JavaDoc
60     public void start() throws Exception JavaDoc{
61         super.start();
62         Store store=getStore();
63         boolean empty=store.getMapContainerIds().isEmpty();
64         stateMap=store.getMapContainer("state",STORE_STATE);
65         stateMap.load();
66         if(!empty){
67             
68             AtomicBoolean JavaDoc status=(AtomicBoolean JavaDoc)stateMap.get(STORE_STATE);
69             if(status!=null){
70                 storeValid=status.get();
71             }
72            
73             if(storeValid){
74                 if(stateMap.containsKey(RECORD_REFERENCES)){
75                     recordReferences=(Map JavaDoc<Integer JavaDoc,AtomicInteger JavaDoc>)stateMap.get(RECORD_REFERENCES);
76                 }
77             }else {
78                 /*
79                 log.warn("Store Not shutdown cleanly - clearing out unsafe records ...");
80                 Set<ContainerId> set = store.getListContainerIds();
81                 for (ContainerId cid:set) {
82                     if (!cid.getDataContainerName().equals(STORE_STATE)) {
83                         store.deleteListContainer(cid);
84                     }
85                 }
86                 set = store.getMapContainerIds();
87                 for (ContainerId cid:set) {
88                     if (!cid.getDataContainerName().equals(STORE_STATE)) {
89                         store.deleteMapContainer(cid);
90                     }
91                 }
92                 */

93                 buildReferenceFileIdsInUse();
94             }
95             
96         }
97         stateMap.put(STORE_STATE,new AtomicBoolean JavaDoc());
98     }
99     
100     @Override JavaDoc
101     public void stop() throws Exception JavaDoc {
102         stateMap.put(RECORD_REFERENCES,recordReferences);
103         stateMap.put(STORE_STATE,new AtomicBoolean JavaDoc(true));
104         super.stop();
105     }
106     
107     
108     public boolean isStoreValid() {
109         return storeValid;
110     }
111     
112
113     public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException JavaDoc {
114         ReferenceStore rc=(ReferenceStore)queues.get(destination);
115         if(rc==null){
116             rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
117             messageStores.put(destination,rc);
118 // if(transactionStore!=null){
119
// rc=transactionStore.proxy(rc);
120
// }
121
queues.put(destination,rc);
122         }
123         return rc;
124     }
125
126     public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException JavaDoc {
127         TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
128         if(rc==null){
129             Store store=getStore();
130             MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
131             MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob");
132             ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
133             ackContainer.setMarshaller(new TopicSubAckMarshaller());
134             rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
135             messageStores.put(destination,rc);
136 // if(transactionStore!=null){
137
// rc=transactionStore.proxy(rc);
138
// }
139
topics.put(destination,rc);
140         }
141         return rc;
142     }
143
144     public void buildReferenceFileIdsInUse() throws IOException JavaDoc {
145         
146         recordReferences = new HashMap JavaDoc<Integer JavaDoc,AtomicInteger JavaDoc>();
147         
148         Set JavaDoc<ActiveMQDestination> destinations = getDestinations();
149         for (ActiveMQDestination destination : destinations) {
150             if( destination.isQueue() ) {
151                 KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
152                 store.addReferenceFileIdsInUse();
153             } else {
154                 KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
155                 store.addReferenceFileIdsInUse();
156             }
157         }
158     }
159     
160         
161     protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object JavaDoc id,String JavaDoc containerName) throws IOException JavaDoc{
162         Store store=getStore();
163         MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
164         container.setKeyMarshaller(new MessageIdMarshaller());
165         container.setValueMarshaller(new ReferenceRecordMarshaller());
166         container.load();
167         return container;
168     }
169     
170     synchronized void addInterestInRecordFile(int recordNumber) {
171         Integer JavaDoc key = new Integer JavaDoc(recordNumber);
172         AtomicInteger JavaDoc rr = recordReferences.get(key);
173         if (rr == null) {
174             rr = new AtomicInteger JavaDoc();
175             recordReferences.put(key,rr);
176         }
177         rr.incrementAndGet();
178     }
179     
180     synchronized void removeInterestInRecordFile(int recordNumber) {
181         Integer JavaDoc key = new Integer JavaDoc(recordNumber);
182         AtomicInteger JavaDoc rr = recordReferences.get(key);
183         if (rr != null && rr.decrementAndGet() <= 0) {
184             recordReferences.remove(key);
185         }
186     }
187
188     /**
189      * @return
190      * @throws IOException
191      * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
192      */

193     public Set JavaDoc<Integer JavaDoc> getReferenceFileIdsInUse() throws IOException JavaDoc{
194         return recordReferences.keySet();
195     }
196
197     
198     
199 }
200
Popular Tags