1 18 package org.apache.activemq.store.kahadaptor; 19 20 import java.io.IOException ; 21 import java.util.HashMap ; 22 import java.util.Map ; 23 import java.util.Set ; 24 import java.util.concurrent.atomic.AtomicBoolean ; 25 import java.util.concurrent.atomic.AtomicInteger ; 26 import org.apache.activemq.command.ActiveMQDestination; 27 import org.apache.activemq.command.ActiveMQQueue; 28 import org.apache.activemq.command.ActiveMQTopic; 29 import org.apache.activemq.command.MessageId; 30 import org.apache.activemq.kaha.ListContainer; 31 import org.apache.activemq.kaha.MapContainer; 32 import org.apache.activemq.kaha.MessageIdMarshaller; 33 import org.apache.activemq.kaha.Store; 34 import org.apache.activemq.store.MessageStore; 35 import org.apache.activemq.store.ReferenceStore; 36 import org.apache.activemq.store.ReferenceStoreAdapter; 37 import org.apache.activemq.store.TopicMessageStore; 38 import org.apache.activemq.store.TopicReferenceStore; 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 42 public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter { 43 private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class); 44 private static final String STORE_STATE = "store-state"; 45 private static final String RECORD_REFERENCES = "record-references"; 46 private MapContainer stateMap; 47 private Map <Integer ,AtomicInteger >recordReferences = new HashMap <Integer ,AtomicInteger >(); 48 private boolean storeValid; 49 50 51 public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 52 throw new RuntimeException ("Use createQueueReferenceStore instead"); 53 } 54 55 public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 56 throw new RuntimeException ("Use createTopicReferenceStore instead"); 57 } 58 59 @Override 60 public void start() throws Exception { 61 super.start(); 62 Store store=getStore(); 63 boolean empty=store.getMapContainerIds().isEmpty(); 64 stateMap=store.getMapContainer("state",STORE_STATE); 65 stateMap.load(); 66 if(!empty){ 67 68 AtomicBoolean status=(AtomicBoolean )stateMap.get(STORE_STATE); 69 if(status!=null){ 70 storeValid=status.get(); 71 } 72 73 if(storeValid){ 74 if(stateMap.containsKey(RECORD_REFERENCES)){ 75 recordReferences=(Map <Integer ,AtomicInteger >)stateMap.get(RECORD_REFERENCES); 76 } 77 }else { 78 93 buildReferenceFileIdsInUse(); 94 } 95 96 } 97 stateMap.put(STORE_STATE,new AtomicBoolean ()); 98 } 99 100 @Override 101 public void stop() throws Exception { 102 stateMap.put(RECORD_REFERENCES,recordReferences); 103 stateMap.put(STORE_STATE,new AtomicBoolean (true)); 104 super.stop(); 105 } 106 107 108 public boolean isStoreValid() { 109 return storeValid; 110 } 111 112 113 public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException { 114 ReferenceStore rc=(ReferenceStore)queues.get(destination); 115 if(rc==null){ 116 rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination); 117 messageStores.put(destination,rc); 118 queues.put(destination,rc); 122 } 123 return rc; 124 } 125 126 public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException { 127 TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination); 128 if(rc==null){ 129 Store store=getStore(); 130 MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data"); 131 MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob"); 132 ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks"); 133 ackContainer.setMarshaller(new TopicSubAckMarshaller()); 134 rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination); 135 messageStores.put(destination,rc); 136 topics.put(destination,rc); 140 } 141 return rc; 142 } 143 144 public void buildReferenceFileIdsInUse() throws IOException { 145 146 recordReferences = new HashMap <Integer ,AtomicInteger >(); 147 148 Set <ActiveMQDestination> destinations = getDestinations(); 149 for (ActiveMQDestination destination : destinations) { 150 if( destination.isQueue() ) { 151 KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination); 152 store.addReferenceFileIdsInUse(); 153 } else { 154 KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination); 155 store.addReferenceFileIdsInUse(); 156 } 157 } 158 } 159 160 161 protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException { 162 Store store=getStore(); 163 MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName); 164 container.setKeyMarshaller(new MessageIdMarshaller()); 165 container.setValueMarshaller(new ReferenceRecordMarshaller()); 166 container.load(); 167 return container; 168 } 169 170 synchronized void addInterestInRecordFile(int recordNumber) { 171 Integer key = new Integer (recordNumber); 172 AtomicInteger rr = recordReferences.get(key); 173 if (rr == null) { 174 rr = new AtomicInteger (); 175 recordReferences.put(key,rr); 176 } 177 rr.incrementAndGet(); 178 } 179 180 synchronized void removeInterestInRecordFile(int recordNumber) { 181 Integer key = new Integer (recordNumber); 182 AtomicInteger rr = recordReferences.get(key); 183 if (rr != null && rr.decrementAndGet() <= 0) { 184 recordReferences.remove(key); 185 } 186 } 187 188 193 public Set <Integer > getReferenceFileIdsInUse() throws IOException { 194 return recordReferences.keySet(); 195 } 196 197 198 199 } 200 | Popular Tags |