1 14 15 package org.apache.activemq.store.kahadaptor; 16 17 import java.io.IOException ; 18 import org.apache.activemq.broker.ConnectionContext; 19 import org.apache.activemq.command.ActiveMQDestination; 20 import org.apache.activemq.command.Message; 21 import org.apache.activemq.command.MessageAck; 22 import org.apache.activemq.command.MessageId; 23 import org.apache.activemq.kaha.MapContainer; 24 import org.apache.activemq.kaha.StoreEntry; 25 import org.apache.activemq.memory.UsageManager; 26 import org.apache.activemq.store.MessageRecoveryListener; 27 import org.apache.activemq.store.MessageStore; 28 29 34 public class KahaMessageStore implements MessageStore{ 35 36 protected final ActiveMQDestination destination; 37 protected final MapContainer<MessageId,Message> messageContainer; 38 protected StoreEntry batchEntry=null; 39 40 public KahaMessageStore(MapContainer<MessageId,Message> container,ActiveMQDestination destination) 41 throws IOException { 42 this.messageContainer=container; 43 this.destination=destination; 44 } 45 46 protected MessageId getMessageId(Object object){ 47 return ((Message)object).getMessageId(); 48 } 49 50 public Object getId(){ 51 return messageContainer.getId(); 52 } 53 54 public synchronized void addMessage(ConnectionContext context,Message message) throws IOException { 55 messageContainer.put(message.getMessageId(),message); 56 } 63 64 public synchronized Message getMessage(MessageId identity) throws IOException { 65 Message result=messageContainer.get(identity); 66 return result; 67 } 68 69 protected void recover(MessageRecoveryListener listener,Object msg) throws Exception { 70 listener.recoverMessage((Message)msg); 71 } 72 73 public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException { 74 removeMessage(ack.getLastMessageId()); 75 } 76 77 78 79 public synchronized void removeMessage(MessageId msgId) throws IOException { 80 StoreEntry entry=messageContainer.getEntry(msgId); 81 if(entry!=null){ 82 messageContainer.remove(entry); 83 if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){ 84 resetBatching(); 85 } 86 } 87 } 88 89 public synchronized void recover(MessageRecoveryListener listener) throws Exception { 90 for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ 91 Message msg=(Message)messageContainer.getValue(entry); 92 recover(listener,msg); 93 } 94 listener.finished(); 95 } 96 97 public void start(){ 98 } 99 100 public void stop(){ 101 } 102 103 public synchronized void removeAllMessages(ConnectionContext context) throws IOException { 104 messageContainer.clear(); 105 } 106 107 public ActiveMQDestination getDestination(){ 108 return destination; 109 } 110 111 public synchronized void delete(){ 112 messageContainer.clear(); 113 } 114 115 118 public void setUsageManager(UsageManager usageManager){ 119 } 120 121 125 public int getMessageCount(){ 126 return messageContainer.size(); 127 } 128 129 135 public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception { 136 return null; 137 } 138 139 147 public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception { 148 StoreEntry entry=batchEntry; 149 if(entry==null){ 150 entry=messageContainer.getFirst(); 151 }else{ 152 entry=messageContainer.refresh(entry); 153 entry=messageContainer.getNext(entry); 154 if(entry==null){ 155 batchEntry=null; 156 } 157 } 158 if(entry!=null){ 159 int count=0; 160 do{ 161 Object msg=messageContainer.getValue(entry); 162 if(msg!=null){ 163 recover(listener,msg); 164 count++; 165 } 166 batchEntry=entry; 167 entry=messageContainer.getNext(entry); 168 }while(entry!=null&&count<maxReturned&&listener.hasSpace()); 169 } 170 listener.finished(); 171 } 172 173 177 public void resetBatching(){ 178 batchEntry=null; 179 } 180 181 184 public boolean isSupportForCursors(){ 185 return true; 186 } 187 } 188 | Popular Tags |