1 14 15 package org.apache.activemq.store.memory; 16 17 import java.util.Iterator ; 18 import java.util.LinkedHashMap ; 19 import java.util.Map ; 20 import java.util.Map.Entry; 21 import org.apache.activemq.command.Message; 22 import org.apache.activemq.command.MessageId; 23 import org.apache.activemq.store.MessageRecoveryListener; 24 25 30 class MemoryTopicSub{ 31 32 private Map map=new LinkedHashMap (); 33 private MessageId lastBatch; 34 35 void addMessage(MessageId id,Message message){ 36 map.put(id,message); 37 } 38 39 void removeMessage(MessageId id){ 40 map.remove(id); 41 if (map.isEmpty()) { 42 lastBatch=null; 43 } 44 } 45 46 int size(){ 47 return map.size(); 48 } 49 50 void recoverSubscription(MessageRecoveryListener listener) throws Exception { 51 for(Iterator iter=map.entrySet().iterator();iter.hasNext();){ 52 Map.Entry entry=(Entry)iter.next(); 53 Object msg=entry.getValue(); 54 if(msg.getClass()==MessageId.class){ 55 listener.recoverMessageReference((MessageId)msg); 56 }else{ 57 listener.recoverMessage((Message)msg); 58 } 59 } 60 listener.finished(); 61 } 62 63 void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception { 64 boolean pastLackBatch=lastBatch==null; 65 MessageId lastId=null; 66 int count=0; 68 for(Iterator iter=map.entrySet().iterator();iter.hasNext()&&count<maxReturned;){ 69 Map.Entry entry=(Entry)iter.next(); 70 if(pastLackBatch){ 71 count++; 72 Object msg=entry.getValue(); 73 lastId=(MessageId)entry.getKey(); 74 if(msg.getClass()==MessageId.class){ 75 listener.recoverMessageReference((MessageId)msg); 76 }else{ 77 listener.recoverMessage((Message)msg); 78 } 79 }else{ 80 pastLackBatch=entry.getKey().equals(lastBatch); 81 } 82 } 83 if(lastId!=null){ 84 lastBatch=lastId; 85 } 86 listener.finished(); 87 } 88 89 void resetBatching(){ 90 lastBatch=null; 91 } 92 } 93 | Popular Tags |