1 14 15 package org.apache.activemq.broker.region.cursors; 16 17 import java.io.IOException ; 18 import java.util.Iterator ; 19 import java.util.LinkedList ; 20 import java.util.concurrent.atomic.AtomicBoolean ; 21 import java.util.concurrent.atomic.AtomicLong ; 22 import org.apache.activemq.broker.region.Destination; 23 import org.apache.activemq.broker.region.MessageReference; 24 import org.apache.activemq.command.Message; 25 import org.apache.activemq.kaha.CommandMarshaller; 26 import org.apache.activemq.kaha.ListContainer; 27 import org.apache.activemq.kaha.Store; 28 import org.apache.activemq.memory.UsageListener; 29 import org.apache.activemq.memory.UsageManager; 30 import org.apache.activemq.openwire.OpenWireFormat; 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 34 39 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{ 40 41 static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class); 42 static private final AtomicLong nameCount = new AtomicLong (); 43 private Store store; 44 private String name; 45 private LinkedList memoryList=new LinkedList (); 46 private ListContainer diskList; 47 private Iterator iter=null; 48 private Destination regionDestination; 49 private boolean iterating; 50 private boolean flushRequired; 51 private AtomicBoolean started=new AtomicBoolean (); 52 53 57 public FilePendingMessageCursor(String name,Store store){ 58 this.name=nameCount.incrementAndGet() + "_"+name; 59 this.store=store; 60 } 61 62 public void start(){ 63 if(started.compareAndSet(false,true)){ 64 if(usageManager!=null){ 65 usageManager.addUsageListener(this); 66 } 67 } 68 } 69 70 public void stop(){ 71 if(started.compareAndSet(true,false)){ 72 gc(); 73 if(usageManager!=null){ 74 usageManager.removeUsageListener(this); 75 } 76 } 77 } 78 79 82 public synchronized boolean isEmpty(){ 83 return memoryList.isEmpty()&&isDiskListEmpty(); 84 } 85 86 90 public synchronized void reset(){ 91 iterating=true; 92 iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator(); 93 } 94 95 public synchronized void release(){ 96 iterating=false; 97 if(flushRequired){ 98 flushRequired=false; 99 flushToDisk(); 100 } 101 } 102 103 public synchronized void destroy(){ 104 stop(); 105 for(Iterator i=memoryList.iterator();i.hasNext();){ 106 Message node=(Message)i.next(); 107 node.decrementReferenceCount(); 108 } 109 memoryList.clear(); 110 if(!isDiskListEmpty()){ 111 getDiskList().clear(); 112 } 113 } 114 115 public synchronized LinkedList pageInList(int maxItems){ 116 LinkedList result=new LinkedList (); 117 int count=0; 118 for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){ 119 result.add(i.next()); 120 count++; 121 } 122 if(count<maxItems&&!isDiskListEmpty()){ 123 for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){ 124 Message message=(Message)i.next(); 125 message.setRegionDestination(regionDestination); 126 message.incrementReferenceCount(); 127 result.add(message); 128 count++; 129 } 130 } 131 return result; 132 } 133 134 139 public synchronized void addMessageLast(MessageReference node){ 140 try{ 141 regionDestination=node.getMessage().getRegionDestination(); 142 if(isSpaceInMemoryList()){ 143 memoryList.add(node); 144 }else{ 145 flushToDisk(); 146 node.decrementReferenceCount(); 147 getDiskList().addLast(node); 148 } 149 }catch(IOException e){ 150 throw new RuntimeException (e); 151 } 152 } 153 154 159 public synchronized void addMessageFirst(MessageReference node){ 160 try{ 161 regionDestination=node.getMessage().getRegionDestination(); 162 if(isSpaceInMemoryList()){ 163 memoryList.addFirst(node); 164 }else{ 165 flushToDisk(); 166 node.decrementReferenceCount(); 167 getDiskList().addFirst(node); 168 } 169 }catch(IOException e){ 170 throw new RuntimeException (e); 171 } 172 } 173 174 177 public synchronized boolean hasNext(){ 178 return iter.hasNext(); 179 } 180 181 184 public synchronized MessageReference next(){ 185 Message message=(Message)iter.next(); 186 if(!isDiskListEmpty()){ 187 message.setRegionDestination(regionDestination); 189 message.incrementReferenceCount(); 190 } 191 return message; 192 } 193 194 198 public synchronized void remove(){ 199 iter.remove(); 200 } 201 202 206 public synchronized void remove(MessageReference node){ 207 memoryList.remove(node); 208 if(!isDiskListEmpty()){ 209 getDiskList().remove(node); 210 } 211 } 212 213 216 public synchronized int size(){ 217 return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size()); 218 } 219 220 224 public synchronized void clear(){ 225 memoryList.clear(); 226 if(!isDiskListEmpty()){ 227 getDiskList().clear(); 228 } 229 } 230 231 public synchronized boolean isFull(){ 232 return false; 234 } 235 236 public boolean hasMessagesBufferedToDeliver(){ 237 return !isEmpty(); 238 } 239 240 public void setUsageManager(UsageManager usageManager){ 241 super.setUsageManager(usageManager); 242 usageManager.addUsageListener(this); 243 } 244 245 public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ 246 if(newPercentUsage>=getMemoryUsageHighWaterMark()){ 247 synchronized(this){ 248 flushRequired=true; 249 if(!iterating){ 250 flushToDisk(); 251 flushRequired=false; 252 } 253 } 254 } 255 } 256 257 protected boolean isSpaceInMemoryList(){ 258 return hasSpace()&&isDiskListEmpty(); 259 } 260 261 protected synchronized void flushToDisk(){ 262 if(!memoryList.isEmpty()){ 263 while(!memoryList.isEmpty()){ 264 MessageReference node=(MessageReference)memoryList.removeFirst(); 265 node.decrementReferenceCount(); 266 getDiskList().addLast(node); 267 } 268 memoryList.clear(); 269 } 270 } 271 272 protected boolean isDiskListEmpty(){ 273 return diskList==null||diskList.isEmpty(); 274 } 275 276 protected ListContainer getDiskList(){ 277 if(diskList==null){ 278 try{ 279 diskList=store.getListContainer(name,"TopicSubscription",true); 280 diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat())); 281 }catch(IOException e){ 282 e.printStackTrace(); 283 throw new RuntimeException (e); 284 } 285 } 286 return diskList; 287 } 288 } 289 | Popular Tags |