1 14 15 package org.apache.activemq.store.memory; 16 17 import java.io.IOException ; 18 import java.util.Collections ; 19 import java.util.Iterator ; 20 import java.util.LinkedHashMap ; 21 import java.util.Map ; 22 import java.util.Map.Entry; 23 24 import org.apache.activemq.broker.ConnectionContext; 25 import org.apache.activemq.command.ActiveMQDestination; 26 import org.apache.activemq.command.Message; 27 import org.apache.activemq.command.MessageAck; 28 import org.apache.activemq.command.MessageId; 29 import org.apache.activemq.memory.UsageManager; 30 import org.apache.activemq.store.MessageRecoveryListener; 31 import org.apache.activemq.store.MessageStore; 32 33 38 public class MemoryMessageStore implements MessageStore{ 39 40 protected final ActiveMQDestination destination; 41 protected final Map messageTable; 42 protected MessageId lastBatchId; 43 44 public MemoryMessageStore(ActiveMQDestination destination){ 45 this(destination,new LinkedHashMap ()); 46 } 47 48 public MemoryMessageStore(ActiveMQDestination destination,Map messageTable){ 49 this.destination=destination; 50 this.messageTable=Collections.synchronizedMap(messageTable); 51 } 52 53 public synchronized void addMessage(ConnectionContext context,Message message) throws IOException { 54 synchronized(messageTable){ 55 messageTable.put(message.getMessageId(),message); 56 } 57 } 58 59 66 public Message getMessage(MessageId identity) throws IOException { 67 return (Message)messageTable.get(identity); 68 } 69 70 74 public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException { 75 removeMessage(ack.getLastMessageId()); 76 } 77 78 public void removeMessage(MessageId msgId) throws IOException { 79 synchronized(messageTable){ 80 messageTable.remove(msgId); 81 if((lastBatchId!=null && lastBatchId.equals(msgId)) || messageTable.isEmpty()){ 82 lastBatchId=null; 83 } 84 } 85 } 86 87 public void recover(MessageRecoveryListener listener) throws Exception { 88 synchronized(messageTable){ 90 for(Iterator iter=messageTable.values().iterator();iter.hasNext();){ 91 Object msg=(Object )iter.next(); 92 if(msg.getClass()==MessageId.class){ 93 listener.recoverMessageReference((MessageId)msg); 94 }else{ 95 listener.recoverMessage((Message)msg); 96 } 97 } 98 listener.finished(); 99 } 100 } 101 102 public void start(){ 103 } 104 105 public void stop(){ 106 } 107 108 public void removeAllMessages(ConnectionContext context) throws IOException { 109 synchronized(messageTable){ 110 messageTable.clear(); 111 } 112 } 113 114 public ActiveMQDestination getDestination(){ 115 return destination; 116 } 117 118 public void delete(){ 119 synchronized(messageTable){ 120 messageTable.clear(); 121 } 122 } 123 124 127 public void setUsageManager(UsageManager usageManager){ 128 } 129 130 public int getMessageCount(){ 131 return messageTable.size(); 132 } 133 134 public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception { 135 synchronized(messageTable){ 136 boolean pastLackBatch=lastBatchId==null; 137 int count=0; 138 for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ 139 Map.Entry entry=(Entry)iter.next(); 140 if(pastLackBatch){ 141 count++; 142 Object msg=entry.getValue(); 143 lastBatchId=(MessageId)entry.getKey(); 144 if(msg.getClass()==MessageId.class){ 145 listener.recoverMessageReference((MessageId)msg); 146 }else{ 147 listener.recoverMessage((Message)msg); 148 } 149 }else{ 150 pastLackBatch=entry.getKey().equals(lastBatchId); 151 } 152 } 153 listener.finished(); 154 } 155 } 156 157 public void resetBatching(){ 158 lastBatchId=null; 159 } 160 } 161 | Popular Tags |