1 18 package org.apache.activemq.store.memory; 19 20 import java.io.File ; 21 import java.io.IOException ; 22 import java.util.HashSet ; 23 import java.util.Iterator ; 24 import java.util.Set ; 25 26 import org.apache.activemq.broker.ConnectionContext; 27 import org.apache.activemq.command.ActiveMQQueue; 28 import org.apache.activemq.command.ActiveMQTopic; 29 import org.apache.activemq.memory.UsageManager; 30 import org.apache.activemq.store.MessageStore; 31 import org.apache.activemq.store.PersistenceAdapter; 32 import org.apache.activemq.store.TopicMessageStore; 33 import org.apache.activemq.store.TransactionStore; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 37 import java.util.concurrent.ConcurrentHashMap ; 38 39 44 public class MemoryPersistenceAdapter implements PersistenceAdapter { 45 private static final Log log = LogFactory.getLog(MemoryPersistenceAdapter.class); 46 47 MemoryTransactionStore transactionStore; 48 ConcurrentHashMap topics = new ConcurrentHashMap (); 49 ConcurrentHashMap queues = new ConcurrentHashMap (); 50 private boolean useExternalMessageReferences; 51 52 public Set getDestinations() { 53 Set rc = new HashSet (queues.size()+topics.size()); 54 for (Iterator iter = queues.keySet().iterator(); iter.hasNext();) { 55 rc.add( iter.next() ); 56 } 57 for (Iterator iter = topics.keySet().iterator(); iter.hasNext();) { 58 rc.add( iter.next() ); 59 } 60 return rc; 61 } 62 63 public static MemoryPersistenceAdapter newInstance(File file) { 64 return new MemoryPersistenceAdapter(); 65 } 66 67 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 68 MessageStore rc = (MessageStore)queues.get(destination); 69 if(rc==null) { 70 rc = new MemoryMessageStore(destination); 71 if( transactionStore !=null ) { 72 rc = transactionStore.proxy(rc); 73 } 74 queues.put(destination, rc); 75 } 76 return rc; 77 } 78 79 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 80 TopicMessageStore rc = (TopicMessageStore)topics.get(destination); 81 if(rc==null) { 82 rc = new MemoryTopicMessageStore(destination); 83 if( transactionStore !=null ) { 84 rc = transactionStore.proxy(rc); 85 } 86 topics.put(destination, rc); 87 } 88 return rc; 89 } 90 91 public TransactionStore createTransactionStore() throws IOException { 92 if( transactionStore==null ) { 93 transactionStore = new MemoryTransactionStore(); 94 } 95 return transactionStore; 96 } 97 98 public void beginTransaction(ConnectionContext context) { 99 } 100 101 public void commitTransaction(ConnectionContext context) { 102 } 103 104 public void rollbackTransaction(ConnectionContext context) { 105 } 106 107 public void start() throws Exception { 108 } 109 110 public void stop() throws Exception { 111 } 112 113 public long getLastMessageBrokerSequenceId() throws IOException { 114 return 0; 115 } 116 117 public void deleteAllMessages() throws IOException { 118 for (Iterator iter = topics.values().iterator(); iter.hasNext();) { 119 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 120 if (store != null) { 121 store.delete(); 122 } 123 } 124 for (Iterator iter = queues.values().iterator(); iter.hasNext();) { 125 MemoryMessageStore store = asMemoryMessageStore(iter.next()); 126 if (store != null) { 127 store.delete(); 128 } 129 } 130 131 if (transactionStore != null) { 132 transactionStore.delete(); 133 } 134 } 135 136 public boolean isUseExternalMessageReferences() { 137 return useExternalMessageReferences; 138 } 139 140 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 141 this.useExternalMessageReferences = useExternalMessageReferences; 142 } 143 144 protected MemoryMessageStore asMemoryMessageStore(Object value) { 145 if (value instanceof MemoryMessageStore) { 146 return (MemoryMessageStore) value; 147 } 148 log.warn("Expected an instance of MemoryMessageStore but was: " + value); 149 return null; 150 } 151 152 155 public void setUsageManager(UsageManager usageManager) { 156 } 157 158 public String toString(){ 159 return "MemoryPersistenceAdapter"; 160 } 161 162 public void setBrokerName(String brokerName){ 163 } 164 165 public void setDirectory(File dir){ 166 } 167 168 public void checkpoint(boolean sync) throws IOException { 169 } 170 } 171 | Popular Tags |