1 14 15 package org.apache.activemq.store.kahadaptor; 16 17 import java.io.IOException ; 18 import java.util.Set ; 19 import org.apache.activemq.broker.ConnectionContext; 20 import org.apache.activemq.command.ActiveMQDestination; 21 import org.apache.activemq.command.Message; 22 import org.apache.activemq.command.MessageAck; 23 import org.apache.activemq.command.MessageId; 24 import org.apache.activemq.kaha.MapContainer; 25 import org.apache.activemq.kaha.StoreEntry; 26 import org.apache.activemq.memory.UsageManager; 27 import org.apache.activemq.store.MessageRecoveryListener; 28 import org.apache.activemq.store.ReferenceStore; 29 30 public class KahaReferenceStore implements ReferenceStore{ 31 32 protected final ActiveMQDestination destination; 33 protected final MapContainer<MessageId,ReferenceRecord> messageContainer; 34 protected KahaReferenceStoreAdapter adapter; 35 private StoreEntry batchEntry=null; 36 37 public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException { 38 this.adapter = adapter; 39 this.messageContainer=container; 40 this.destination=destination; 41 } 42 43 public void start(){ 44 } 45 46 public void stop(){ 47 } 48 49 protected MessageId getMessageId(Object object){ 50 return new MessageId(((ReferenceRecord)object).getMessageId()); 51 } 52 53 public synchronized void addMessage(ConnectionContext context,Message message) throws IOException { 54 throw new RuntimeException ("Use addMessageReference instead"); 55 } 56 57 public synchronized Message getMessage(MessageId identity) throws IOException { 58 throw new RuntimeException ("Use addMessageReference instead"); 59 } 60 61 protected void recover(MessageRecoveryListener listener,Object msg) throws Exception { 62 ReferenceRecord record=(ReferenceRecord)msg; 63 listener.recoverMessageReference(new MessageId(record.getMessageId())); 64 } 65 66 public synchronized void recover(MessageRecoveryListener listener) throws Exception { 67 for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ 68 ReferenceRecord record=messageContainer.getValue(entry); 69 recover(listener,new MessageId(record.getMessageId())); 70 } 71 listener.finished(); 72 } 73 74 public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception { 75 StoreEntry entry=batchEntry; 76 if(entry==null){ 77 entry=messageContainer.getFirst(); 78 }else{ 79 entry=messageContainer.refresh(entry); 80 if (entry != null) { 81 entry=messageContainer.getNext(entry); 82 } 83 } 84 if(entry!=null){ 85 int count=0; 86 do{ 87 Object msg=messageContainer.getValue(entry); 88 if(msg!=null){ 89 recover(listener,msg); 90 count++; 91 } 92 batchEntry=entry; 93 entry=messageContainer.getNext(entry); 94 }while(entry!=null&&count<maxReturned&&listener.hasSpace()); 95 } 96 listener.finished(); 97 } 98 99 public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data) 100 throws IOException { 101 ReferenceRecord record=new ReferenceRecord(messageId.toString(),data); 102 messageContainer.put(messageId,record); 103 addInterest(record); 104 } 105 106 public ReferenceData getMessageReference(MessageId identity) throws IOException { 107 ReferenceRecord result=messageContainer.get(identity); 108 if(result==null) 109 return null; 110 return result.getData(); 111 } 112 113 public void addReferenceFileIdsInUse(){ 114 for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ 115 ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry); 116 addInterest(msg); 117 } 118 } 119 120 public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException { 121 removeMessage(ack.getLastMessageId()); 122 } 123 124 public synchronized void removeMessage(MessageId msgId) throws IOException { 125 StoreEntry entry=messageContainer.getEntry(msgId); 126 if(entry!=null){ 127 ReferenceRecord rr=messageContainer.remove(msgId); 128 if(rr!=null){ 129 removeInterest(rr); 130 if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){ 131 resetBatching(); 132 } 133 } 134 } 135 } 136 137 public synchronized void removeAllMessages(ConnectionContext context) throws IOException { 138 messageContainer.clear(); 139 } 140 141 public ActiveMQDestination getDestination(){ 142 return destination; 143 } 144 145 public synchronized void delete(){ 146 messageContainer.clear(); 147 } 148 149 public void resetBatching(){ 150 batchEntry=null; 151 } 152 153 public int getMessageCount(){ 154 return messageContainer.size(); 155 } 156 157 public void setUsageManager(UsageManager usageManager){ 158 } 159 160 public boolean isSupportForCursors(){ 161 return true; 162 } 163 164 165 public boolean supportsExternalBatchControl(){ 166 return true; 167 } 168 169 void removeInterest(ReferenceRecord rr) { 170 adapter.removeInterestInRecordFile(rr.getData().getFileId()); 171 } 172 173 void addInterest(ReferenceRecord rr) { 174 adapter.addInterestInRecordFile(rr.getData().getFileId()); 175 } 176 177 181 public void setBatch(MessageId startAfter){ 182 } 183 } 184 | Popular Tags |