1 18 19 package org.apache.activemq.broker.region.cursors; 20 21 import java.io.IOException ; 22 import java.util.LinkedList ; 23 import org.apache.activemq.broker.region.Destination; 24 import org.apache.activemq.broker.region.MessageReference; 25 import org.apache.activemq.broker.region.Queue; 26 import org.apache.activemq.command.Message; 27 import org.apache.activemq.command.MessageId; 28 import org.apache.activemq.store.MessageRecoveryListener; 29 import org.apache.activemq.store.MessageStore; 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 33 39 class QueueStorePrefetch extends AbstractPendingMessageCursor implements 40 MessageRecoveryListener { 41 42 static private final Log log=LogFactory.getLog(QueueStorePrefetch.class); 43 44 private MessageStore store; 45 private final LinkedList <Message>batchList=new LinkedList <Message>(); 46 private Destination regionDestination; 47 private int size = 0; 48 49 55 public QueueStorePrefetch(Queue queue){ 56 this.regionDestination = queue; 57 this.store=(MessageStore)queue.getMessageStore(); 58 59 } 60 61 public void start() throws Exception { 62 } 63 64 public void stop() throws Exception { 65 store.resetBatching(); 66 gc(); 67 } 68 69 72 public boolean isEmpty(){ 73 return size <= 0; 74 } 75 76 public boolean hasMessagesBufferedToDeliver() { 77 return !batchList.isEmpty(); 78 } 79 80 public synchronized int size(){ 81 try { 82 size = store.getMessageCount(); 83 }catch(IOException e) { 84 log.error("Failed to get message count",e); 85 throw new RuntimeException (e); 86 } 87 return size; 88 } 89 90 public synchronized void addMessageLast(MessageReference node) throws Exception { 91 size++; 92 } 93 94 public void addMessageFirst(MessageReference node) throws Exception { 95 size++; 96 } 97 98 public void remove(){ 99 size--; 100 } 101 102 public void remove(MessageReference node){ 103 size--; 104 } 105 106 107 public synchronized boolean hasNext(){ 108 if(batchList.isEmpty()){ 109 try{ 110 fillBatch(); 111 }catch(Exception e){ 112 log.error("Failed to fill batch",e); 113 throw new RuntimeException (e); 114 } 115 } 116 return !batchList.isEmpty(); 117 } 118 119 public synchronized MessageReference next(){ 120 Message result = batchList.removeFirst(); 121 result.decrementReferenceCount(); 122 result.setRegionDestination(regionDestination); 123 return result; 124 } 125 126 public void reset(){ 127 } 128 129 public void finished(){ 131 } 132 133 public void recoverMessage(Message message) throws Exception { 134 message.setRegionDestination(regionDestination); 135 message.incrementReferenceCount(); 136 batchList.addLast(message); 137 } 138 139 public void recoverMessageReference(MessageId messageReference) throws Exception { 140 Message msg=store.getMessage(messageReference); 141 if(msg!=null){ 142 recoverMessage(msg); 143 }else{ 144 String err = "Failed to retrieve message for id: "+messageReference; 145 log.error(err); 146 throw new IOException (err); 147 } 148 } 149 150 public void gc() { 151 for (Message msg:batchList) { 152 msg.decrementReferenceCount(); 153 } 154 batchList.clear(); 155 } 156 157 protected void fillBatch() throws Exception { 159 store.recoverNextMessages(maxBatchSize,this); 160 } 161 162 public String toString() { 163 return "QueueStorePrefetch" + System.identityHashCode(this) ; 164 } 165 166 } 167 | Popular Tags |