1 14 15 package org.apache.activemq.broker.region.cursors; 16 17 import java.io.IOException ; 18 import java.util.LinkedList ; 19 import org.apache.activemq.broker.region.Destination; 20 import org.apache.activemq.broker.region.MessageReference; 21 import org.apache.activemq.broker.region.Topic; 22 import org.apache.activemq.command.Message; 23 import org.apache.activemq.command.MessageId; 24 import org.apache.activemq.store.MessageRecoveryListener; 25 import org.apache.activemq.store.TopicMessageStore; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 29 34 class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{ 35 36 static private final Log log=LogFactory.getLog(TopicStorePrefetch.class); 37 private TopicMessageStore store; 38 private final LinkedList <Message> batchList=new LinkedList <Message>(); 39 private String clientId; 40 private String subscriberName; 41 private Destination regionDestination; 42 private MessageId firstMessageId; 43 private MessageId lastMessageId; 44 private int pendingCount; 45 private boolean started; 46 47 52 public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){ 53 this.regionDestination=topic; 54 this.store=(TopicMessageStore)topic.getMessageStore(); 55 this.clientId=clientId; 56 this.subscriberName=subscriberName; 57 } 58 59 public synchronized void start(){ 60 if(!started){ 61 started=true; 62 pendingCount=getStoreSize(); 63 try{ 64 fillBatch(); 65 }catch(Exception e){ 66 log.error("Failed to fill batch",e); 67 throw new RuntimeException (e); 68 } 69 } 70 } 71 72 public synchronized void stop(){ 73 if(started){ 74 started=false; 75 store.resetBatching(clientId,subscriberName); 76 gc(); 77 } 78 } 79 80 83 public boolean isEmpty(){ 84 return pendingCount <= 0; 85 } 86 87 public synchronized int size(){ 88 return getPendingCount(); 89 } 90 91 public synchronized void addMessageLast(MessageReference node) throws Exception { 92 if(node!=null){ 93 if(isEmpty() && started){ 94 firstMessageId=node.getMessageId(); 95 } 96 lastMessageId=node.getMessageId(); 97 node.decrementReferenceCount(); 98 pendingCount++; 99 } 100 } 101 102 public void addMessageFirst(MessageReference node) throws Exception { 103 if(node!=null){ 104 if(started){ 105 firstMessageId=node.getMessageId(); 106 } 107 node.decrementReferenceCount(); 108 pendingCount++; 109 } 110 } 111 112 public synchronized void remove(){ 113 pendingCount--; 114 } 115 116 public synchronized void remove(MessageReference node){ 117 pendingCount--; 118 } 119 120 public synchronized void clear(){ 121 pendingCount=0; 122 } 123 124 public synchronized boolean hasNext(){ 125 return !isEmpty(); 126 } 127 128 public synchronized MessageReference next(){ 129 Message result=null; 130 if(!isEmpty()){ 131 if(batchList.isEmpty()){ 132 try{ 133 fillBatch(); 134 }catch(final Exception e){ 135 log.error("Failed to fill batch",e); 136 throw new RuntimeException (e); 137 } 138 if(batchList.isEmpty()){ 139 return null; 140 } 141 } 142 if(!batchList.isEmpty()){ 143 result=batchList.removeFirst(); 144 if(lastMessageId!=null){ 145 if(result.getMessageId().equals(lastMessageId)){ 146 } 148 } 149 result.setRegionDestination(regionDestination); 150 } 151 } 152 return result; 153 } 154 155 public void reset(){ 156 } 157 158 public void finished(){ 160 } 161 162 public synchronized void recoverMessage(Message message) throws Exception { 163 message.setRegionDestination(regionDestination); 164 if(message.getReferenceCount()==0){ 166 message.incrementReferenceCount(); 167 } 168 batchList.addLast(message); 169 } 170 171 public void recoverMessageReference(MessageId messageReference) throws Exception { 172 throw new RuntimeException ("Not supported"); 174 } 175 176 protected synchronized void fillBatch() throws Exception { 178 if(!isEmpty()){ 179 store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this); 180 if(firstMessageId!=null){ 181 int pos=0; 182 for(Message msg:batchList){ 183 if(msg.getMessageId().equals(firstMessageId)){ 184 firstMessageId=null; 185 break; 186 } 187 pos++; 188 } 189 if(pos>0){ 190 for(int i=0;i<pos&&!batchList.isEmpty();i++){ 191 batchList.removeFirst(); 192 } 193 if(batchList.isEmpty()){ 194 log.debug("Refilling batch - haven't got past first message = " + firstMessageId); 195 fillBatch(); 196 } 197 } 198 } 199 } 200 } 201 202 protected synchronized int getPendingCount(){ 203 if(pendingCount <= 0){ 204 pendingCount = getStoreSize(); 205 } 206 return pendingCount; 207 } 208 209 protected synchronized int getStoreSize(){ 210 try{ 211 return store.getMessageCount(clientId,subscriberName); 212 }catch(IOException e){ 213 log.error(this+" Failed to get the outstanding message count from the store",e); 214 throw new RuntimeException (e); 215 } 216 } 217 218 219 220 public synchronized void gc(){ 221 for(Message msg:batchList){ 222 msg.decrementReferenceCount(); 223 } 224 batchList.clear(); 225 } 226 227 public String toString(){ 228 return "TopicStorePrefetch"+System.identityHashCode(this)+"("+clientId+","+subscriberName+")"; 229 } 230 } 231 | Popular Tags |