1 14 15 package org.apache.activemq.broker.region.cursors; 16 17 import java.io.IOException ; 18 import java.util.HashMap ; 19 import java.util.Iterator ; 20 import java.util.LinkedList ; 21 import java.util.Map ; 22 import org.apache.activemq.advisory.AdvisorySupport; 23 import org.apache.activemq.broker.ConnectionContext; 24 import org.apache.activemq.broker.region.Destination; 25 import org.apache.activemq.broker.region.MessageReference; 26 import org.apache.activemq.broker.region.Topic; 27 import org.apache.activemq.command.Message; 28 import org.apache.activemq.kaha.Store; 29 import org.apache.activemq.memory.UsageManager; 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 33 38 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ 39 40 static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class); 41 private int pendingCount=0; 42 private String clientId; 43 private String subscriberName; 44 private Map topics=new HashMap (); 45 private LinkedList <PendingMessageCursor>storePrefetches=new LinkedList <PendingMessageCursor>(); 46 private boolean started; 47 private PendingMessageCursor nonPersistent; 48 private PendingMessageCursor currentCursor; 49 50 56 public StoreDurableSubscriberCursor(String clientId,String subscriberName,Store store,int maxBatchSize){ 57 this.clientId=clientId; 58 this.subscriberName=subscriberName; 59 this.nonPersistent=new FilePendingMessageCursor(clientId+subscriberName,store); 60 storePrefetches.add(nonPersistent); 61 } 62 63 public synchronized void start() throws Exception { 64 if(!started){ 65 started=true; 66 for(PendingMessageCursor tsp: storePrefetches){ 67 tsp.start(); 68 pendingCount+=tsp.size(); 69 } 70 } 71 } 72 73 public synchronized void stop() throws Exception { 74 if(started){ 75 started=false; 76 for(PendingMessageCursor tsp: storePrefetches){ 77 tsp.stop(); 78 } 79 80 pendingCount=0; 81 } 82 } 83 84 91 public synchronized void add(ConnectionContext context,Destination destination) throws Exception { 92 if(destination!=null&&!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())){ 93 TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName); 94 tsp.setMaxBatchSize(getMaxBatchSize()); 95 tsp.setUsageManager(usageManager); 96 topics.put(destination,tsp); 97 storePrefetches.add(tsp); 98 if(started){ 99 tsp.start(); 100 pendingCount+=tsp.size(); 101 } 102 } 103 } 104 105 112 public synchronized void remove(ConnectionContext context,Destination destination) throws Exception { 113 Object tsp=topics.remove(destination); 114 if(tsp!=null){ 115 storePrefetches.remove(tsp); 116 } 117 } 118 119 122 public synchronized boolean isEmpty(){ 123 return pendingCount<=0; 124 } 125 126 public boolean isEmpty(Destination destination){ 127 boolean result=true; 128 TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(destination); 129 if(tsp!=null){ 130 result=tsp.size()<=0; 131 } 132 return result; 133 } 134 135 142 public boolean isRecoveryRequired(){ 143 return false; 144 } 145 146 public synchronized void addMessageLast(MessageReference node) throws Exception { 147 if(node!=null){ 148 Message msg=node.getMessage(); 149 if(started){ 150 pendingCount++; 151 if(!msg.isPersistent()){ 152 nonPersistent.addMessageLast(node); 153 } 154 } 155 if(msg.isPersistent()){ 156 Destination dest=msg.getRegionDestination(); 157 TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest); 158 if(tsp!=null){ 159 tsp.addMessageLast(node); 160 } 161 } 162 } 163 } 164 165 public void addRecoveredMessage(MessageReference node) throws Exception { 166 nonPersistent.addMessageLast(node); 167 } 168 169 public void clear(){ 170 pendingCount=0; 171 nonPersistent.clear(); 172 for(PendingMessageCursor tsp: storePrefetches){ 173 tsp.clear(); 174 } 175 } 176 177 public synchronized boolean hasNext(){ 178 boolean result=pendingCount>0; 179 if(result){ 180 try{ 181 currentCursor=getNextCursor(); 182 }catch(Exception e){ 183 log.error("Failed to get current cursor ",e); 184 throw new RuntimeException (e); 185 } 186 result=currentCursor!=null?currentCursor.hasNext():false; 187 } 188 return result; 189 } 190 191 public synchronized MessageReference next(){ 192 MessageReference result = currentCursor!=null?currentCursor.next():null; 193 return result; 194 } 195 196 public synchronized void remove(){ 197 if(currentCursor!=null){ 198 currentCursor.remove(); 199 } 200 pendingCount--; 201 } 202 203 public synchronized void remove(MessageReference node){ 204 if(currentCursor!=null){ 205 currentCursor.remove(node); 206 } 207 pendingCount--; 208 } 209 210 public synchronized void reset(){ 211 for(Iterator i=storePrefetches.iterator();i.hasNext();){ 212 AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); 213 tsp.reset(); 214 } 215 } 216 217 public synchronized void release(){ 218 for(Iterator i=storePrefetches.iterator();i.hasNext();){ 219 AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); 220 tsp.release(); 221 } 222 } 223 224 public int size(){ 225 return pendingCount; 226 } 227 228 public synchronized void setMaxBatchSize(int maxBatchSize){ 229 for(Iterator i=storePrefetches.iterator();i.hasNext();){ 230 AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); 231 tsp.setMaxBatchSize(maxBatchSize); 232 } 233 super.setMaxBatchSize(maxBatchSize); 234 } 235 236 public synchronized void gc(){ 237 for(Iterator i=storePrefetches.iterator();i.hasNext();){ 238 PendingMessageCursor tsp=(PendingMessageCursor)i.next(); 239 tsp.gc(); 240 } 241 } 242 243 public synchronized void setUsageManager(UsageManager usageManager){ 244 super.setUsageManager(usageManager); 245 for(Iterator i=storePrefetches.iterator();i.hasNext();){ 246 PendingMessageCursor tsp=(PendingMessageCursor)i.next(); 247 tsp.setUsageManager(usageManager); 248 } 249 } 250 251 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 252 if(currentCursor==null||currentCursor.isEmpty()){ 253 currentCursor=null; 254 for(Iterator i=storePrefetches.iterator();i.hasNext();){ 255 AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); 256 if(tsp.hasNext()){ 257 currentCursor=tsp; 258 break; 259 } 260 } 261 storePrefetches.addLast(storePrefetches.removeFirst()); 263 } 264 return currentCursor; 265 } 266 267 public String toString(){ 268 return "StoreDurableSubscriber("+clientId+":"+subscriberName+")"; 269 } 270 } 271 | Popular Tags |