1 14 15 package org.apache.activemq.store.kahadaptor; 16 17 import java.io.File ; 18 import java.io.IOException ; 19 import java.util.HashSet ; 20 import java.util.Iterator ; 21 import java.util.Set ; 22 import java.util.concurrent.ConcurrentHashMap ; 23 import org.apache.activemq.broker.ConnectionContext; 24 import org.apache.activemq.command.ActiveMQDestination; 25 import org.apache.activemq.command.ActiveMQQueue; 26 import org.apache.activemq.command.ActiveMQTopic; 27 import org.apache.activemq.command.Message; 28 import org.apache.activemq.command.MessageId; 29 import org.apache.activemq.kaha.CommandMarshaller; 30 import org.apache.activemq.kaha.ListContainer; 31 import org.apache.activemq.kaha.MapContainer; 32 import org.apache.activemq.kaha.Marshaller; 33 import org.apache.activemq.kaha.MessageIdMarshaller; 34 import org.apache.activemq.kaha.MessageMarshaller; 35 import org.apache.activemq.kaha.Store; 36 import org.apache.activemq.kaha.StoreFactory; 37 import org.apache.activemq.kaha.impl.StoreLockedExcpetion; 38 import org.apache.activemq.memory.UsageManager; 39 import org.apache.activemq.openwire.OpenWireFormat; 40 import org.apache.activemq.store.MessageStore; 41 import org.apache.activemq.store.PersistenceAdapter; 42 import org.apache.activemq.store.TopicMessageStore; 43 import org.apache.activemq.store.TransactionStore; 44 import org.apache.activemq.util.IOHelper; 45 import org.apache.commons.logging.Log; 46 import org.apache.commons.logging.LogFactory; 47 48 53 public class KahaPersistenceAdapter implements PersistenceAdapter{ 54 55 private static final int STORE_LOCKED_WAIT_DELAY=10*1000; 56 private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class); 57 static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions"; 58 KahaTransactionStore transactionStore; 59 ConcurrentHashMap <ActiveMQTopic,TopicMessageStore> topics=new ConcurrentHashMap <ActiveMQTopic,TopicMessageStore>(); 60 ConcurrentHashMap <ActiveMQQueue,MessageStore> queues=new ConcurrentHashMap <ActiveMQQueue,MessageStore>(); 61 ConcurrentHashMap <ActiveMQDestination,MessageStore> messageStores=new ConcurrentHashMap <ActiveMQDestination,MessageStore>(); 62 protected OpenWireFormat wireFormat=new OpenWireFormat(); 63 private long maxDataFileLength=32*1024*1024; 64 private File directory; 65 private String brokerName; 66 private Store theStore; 67 private boolean initialized; 68 69 public Set <ActiveMQDestination> getDestinations(){ 70 Set <ActiveMQDestination> rc=new HashSet <ActiveMQDestination>(); 71 try{ 72 Store store=getStore(); 73 for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ 74 Object obj=i.next(); 75 if(obj instanceof ActiveMQDestination){ 76 rc.add((ActiveMQDestination)obj); 77 } 78 } 79 }catch(IOException e){ 80 log.error("Failed to get destinations ",e); 81 } 82 return rc; 83 } 84 85 public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 86 MessageStore rc=queues.get(destination); 87 if(rc==null){ 88 rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination); 89 messageStores.put(destination,rc); 90 if(transactionStore!=null){ 91 rc=transactionStore.proxy(rc); 92 } 93 queues.put(destination,rc); 94 } 95 return rc; 96 } 97 98 public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 99 TopicMessageStore rc=topics.get(destination); 100 if(rc==null){ 101 Store store=getStore(); 102 MapContainer messageContainer=getMapContainer(destination,"topic-data"); 103 MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs"); 104 ListContainer<TopicSubAck> ackContainer=store.getListContainer(destination.toString(),"topic-acks"); 105 ackContainer.setMarshaller(new TopicSubAckMarshaller()); 106 rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination); 107 messageStores.put(destination,rc); 108 if(transactionStore!=null){ 109 rc=transactionStore.proxy(rc); 110 } 111 topics.put(destination,rc); 112 } 113 return rc; 114 } 115 116 protected MessageStore retrieveMessageStore(Object id){ 117 MessageStore result=messageStores.get(id); 118 return result; 119 } 120 121 public TransactionStore createTransactionStore() throws IOException { 122 if(transactionStore==null){ 123 while(true){ 124 try{ 125 Store store=getStore(); 126 MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions"); 127 container.setKeyMarshaller(new CommandMarshaller(wireFormat)); 128 container.setValueMarshaller(new TransactionMarshaller(wireFormat)); 129 container.load(); 130 transactionStore=new KahaTransactionStore(this,container); 131 break; 132 }catch(StoreLockedExcpetion e){ 133 log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000) 134 +" seconds for the Store to be unlocked."); 135 try{ 136 Thread.sleep(STORE_LOCKED_WAIT_DELAY); 137 }catch(InterruptedException e1){ 138 } 139 } 140 } 141 } 142 return transactionStore; 143 } 144 145 public void beginTransaction(ConnectionContext context){ 146 } 147 148 public void commitTransaction(ConnectionContext context) throws IOException { 149 if(theStore!=null){ 150 theStore.force(); 151 } 152 } 153 154 public void rollbackTransaction(ConnectionContext context){ 155 } 156 157 public void start() throws Exception { 158 initialize(); 159 } 160 161 public void stop() throws Exception { 162 if(theStore!=null){ 163 theStore.close(); 164 } 165 } 166 167 public long getLastMessageBrokerSequenceId() throws IOException { 168 return 0; 169 } 170 171 public void deleteAllMessages() throws IOException { 172 if(theStore!=null){ 173 if(theStore.isInitialized()){ 174 theStore.clear(); 175 }else{ 176 theStore.delete(); 177 } 178 }else{ 179 StoreFactory.delete(getStoreName()); 180 } 181 } 182 183 protected MapContainer<MessageId,Message> getMapContainer(Object id,String containerName) throws IOException { 184 Store store=getStore(); 185 MapContainer<MessageId,Message> container=store.getMapContainer(id,containerName); 186 container.setKeyMarshaller(new MessageIdMarshaller()); 187 container.setValueMarshaller(new MessageMarshaller(wireFormat)); 188 container.load(); 189 return container; 190 } 191 192 protected MapContainer<String ,Object > getSubsMapContainer(Object id,String containerName) throws IOException { 193 Store store=getStore(); 194 MapContainer<String ,Object > container=store.getMapContainer(id,containerName); 195 container.setKeyMarshaller(Store.StringMarshaller); 196 container.setValueMarshaller(createMessageMarshaller()); 197 container.load(); 198 return container; 199 } 200 201 protected Marshaller<Object > createMessageMarshaller(){ 202 return new CommandMarshaller(wireFormat); 203 } 204 205 protected ListContainer getListContainer(Object id,String containerName) throws IOException { 206 Store store=getStore(); 207 ListContainer container=store.getListContainer(id,containerName); 208 container.setMarshaller(createMessageMarshaller()); 209 container.load(); 210 return container; 211 } 212 213 216 public void setUsageManager(UsageManager usageManager){ 217 } 218 219 222 public long getMaxDataFileLength(){ 223 return maxDataFileLength; 224 } 225 226 231 public void setMaxDataFileLength(long maxDataFileLength){ 232 this.maxDataFileLength=maxDataFileLength; 233 } 234 235 protected synchronized Store getStore() throws IOException { 236 if(theStore==null){ 237 theStore=StoreFactory.open(getStoreName(),"rw"); 238 theStore.setMaxDataFileLength(maxDataFileLength); 239 } 240 return theStore; 241 } 242 243 private String getStoreName(){ 244 initialize(); 245 return directory.getAbsolutePath(); 246 } 247 248 public String toString(){ 249 return "KahaPersistenceAdapter("+getStoreName()+")"; 250 } 251 252 public void setBrokerName(String brokerName){ 253 this.brokerName=brokerName; 254 } 255 256 public String getBrokerName(){ 257 return brokerName; 258 } 259 260 public File getDirectory(){ 261 return this.directory; 262 } 263 264 public void setDirectory(File directory){ 265 this.directory=directory; 266 } 267 268 public void checkpoint(boolean sync) throws IOException { 269 if(sync){ 270 getStore().force(); 271 } 272 } 273 274 private void initialize(){ 275 if(!initialized){ 276 initialized=true; 277 if(this.directory==null){ 278 this.directory=new File (IOHelper.getDefaultDataDirectory()); 279 this.directory=new File (this.directory,brokerName+"-kahastore"); 280 } 281 this.directory.mkdirs(); 282 wireFormat.setCacheEnabled(false); 283 wireFormat.setTightEncodingEnabled(true); 284 } 285 } 286 287 288 } 289 | Popular Tags |