1 14 15 package org.apache.activemq.broker.region.cursors; 16 17 import org.apache.activemq.broker.region.MessageReference; 18 import org.apache.activemq.broker.region.Queue; 19 import org.apache.activemq.command.Message; 20 import org.apache.activemq.kaha.Store; 21 import org.apache.activemq.memory.UsageManager; 22 import org.apache.commons.logging.Log; 23 import org.apache.commons.logging.LogFactory; 24 25 30 public class StoreQueueCursor extends AbstractPendingMessageCursor{ 31 32 static private final Log log=LogFactory.getLog(StoreQueueCursor.class); 33 private int pendingCount=0; 34 private Queue queue; 35 private Store tmpStore; 36 private PendingMessageCursor nonPersistent; 37 private QueueStorePrefetch persistent; 38 private boolean started; 39 private PendingMessageCursor currentCursor; 40 41 42 48 public StoreQueueCursor(Queue queue,Store tmpStore){ 49 this.queue=queue; 50 this.tmpStore=tmpStore; 51 this.persistent=new QueueStorePrefetch(queue); 52 currentCursor = persistent; 53 } 54 55 public synchronized void start() throws Exception { 56 started=true; 57 if(nonPersistent==null){ 58 nonPersistent=new FilePendingMessageCursor(queue.getDestination(),tmpStore); 59 nonPersistent.setMaxBatchSize(getMaxBatchSize()); 60 nonPersistent.setUsageManager(usageManager); 61 } 62 nonPersistent.start(); 63 persistent.start(); 64 pendingCount=persistent.size() + nonPersistent.size(); 65 } 66 67 public synchronized void stop() throws Exception { 68 started=false; 69 if(nonPersistent!=null){ 70 nonPersistent.stop(); 71 nonPersistent.gc(); 72 } 73 persistent.stop(); 74 persistent.gc(); 75 pendingCount=0; 76 } 77 78 public synchronized void addMessageLast(MessageReference node) throws Exception { 79 if(node!=null){ 80 Message msg=node.getMessage(); 81 if(started){ 82 pendingCount++; 83 if(!msg.isPersistent()){ 84 nonPersistent.addMessageLast(node); 85 } 86 } 87 if(msg.isPersistent()){ 88 persistent.addMessageLast(node); 89 } 90 } 91 } 92 93 public void addMessageFirst(MessageReference node) throws Exception { 94 if(node!=null){ 95 Message msg=node.getMessage(); 96 if(started){ 97 pendingCount++; 98 if(!msg.isPersistent()){ 99 nonPersistent.addMessageFirst(node); 100 } 101 } 102 if(msg.isPersistent()){ 103 persistent.addMessageFirst(node); 104 } 105 } 106 } 107 108 public void clear(){ 109 pendingCount=0; 110 } 111 112 public synchronized boolean hasNext(){ 113 114 boolean result=pendingCount>0; 115 if(result){ 116 try{ 117 currentCursor=getNextCursor(); 118 }catch(Exception e){ 119 log.error("Failed to get current cursor ",e); 120 throw new RuntimeException (e); 121 } 122 result=currentCursor!=null?currentCursor.hasNext():false; 123 } 124 return result; 125 } 126 127 public synchronized MessageReference next(){ 128 MessageReference result = currentCursor!=null?currentCursor.next():null; 129 return result; 130 } 131 132 public synchronized void remove(){ 133 if(currentCursor!=null){ 134 currentCursor.remove(); 135 } 136 pendingCount--; 137 } 138 139 public synchronized void remove(MessageReference node){ 140 if (!node.isPersistent()) { 141 nonPersistent.remove(node); 142 }else { 143 persistent.remove(node); 144 } 145 pendingCount--; 146 } 147 148 public synchronized void reset(){ 149 nonPersistent.reset(); 150 persistent.reset(); 151 } 152 153 public int size(){ 154 return pendingCount; 155 } 156 157 public synchronized boolean isEmpty(){ 158 return pendingCount<=0; 159 } 160 161 168 public boolean isRecoveryRequired(){ 169 return false; 170 } 171 172 175 public PendingMessageCursor getNonPersistent(){ 176 return this.nonPersistent; 177 } 178 179 182 public void setNonPersistent(PendingMessageCursor nonPersistent){ 183 this.nonPersistent=nonPersistent; 184 } 185 186 public void setMaxBatchSize(int maxBatchSize){ 187 persistent.setMaxBatchSize(maxBatchSize); 188 if(nonPersistent!=null){ 189 nonPersistent.setMaxBatchSize(maxBatchSize); 190 } 191 super.setMaxBatchSize(maxBatchSize); 192 } 193 194 public void gc() { 195 if (persistent != null) { 196 persistent.gc(); 197 } 198 if (nonPersistent != null) { 199 nonPersistent.gc(); 200 } 201 } 202 203 public void setUsageManager(UsageManager usageManager){ 204 super.setUsageManager(usageManager); 205 if (persistent != null) { 206 persistent.setUsageManager(usageManager); 207 } 208 if (nonPersistent != null) { 209 nonPersistent.setUsageManager(usageManager); 210 } 211 } 212 213 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 214 if(currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()){ 215 currentCursor=currentCursor==persistent?nonPersistent:persistent; 216 if (currentCursor.isEmpty()) { 218 currentCursor=currentCursor==persistent?nonPersistent:persistent; 219 } 220 } 221 return currentCursor; 222 } 223 } 224 | Popular Tags |