1 18 package org.apache.activemq.util; 19 20 import java.util.ArrayList ; 21 import java.util.List ; 22 import java.util.concurrent.CountDownLatch ; 23 24 import javax.jms.JMSException ; 25 import javax.jms.Message ; 26 import javax.jms.MessageListener ; 27 28 import junit.framework.Assert; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 33 45 public class MessageIdList extends Assert implements MessageListener { 46 47 protected static final Log log = LogFactory.getLog(MessageIdList.class); 48 49 private List messageIds = new ArrayList (); 50 private Object semaphore; 51 private boolean verbose; 52 private MessageListener parent; 53 private long maximumDuration = 15000L; 54 55 private CountDownLatch countDownLatch; 56 57 public MessageIdList() { 58 this(new Object ()); 59 } 60 61 public MessageIdList(Object semaphore) { 62 this.semaphore = semaphore; 63 } 64 65 public boolean equals(Object that) { 66 if (that instanceof MessageIdList) { 67 MessageIdList thatList = (MessageIdList) that; 68 return getMessageIds().equals(thatList.getMessageIds()); 69 } 70 return false; 71 } 72 73 public int hashCode() { 74 synchronized (semaphore) { 75 return messageIds.hashCode() + 1; 76 } 77 } 78 79 public String toString() { 80 synchronized (semaphore) { 81 return messageIds.toString(); 82 } 83 } 84 85 88 public List flushMessages() { 89 synchronized (semaphore) { 90 List answer = new ArrayList (messageIds); 91 messageIds.clear(); 92 return answer; 93 } 94 } 95 96 public synchronized List getMessageIds() { 97 synchronized (semaphore) { 98 return new ArrayList (messageIds); 99 } 100 } 101 102 public void onMessage(Message message) { 103 String id=null; 104 try { 105 if( countDownLatch != null ) 106 countDownLatch.countDown(); 107 108 id = message.getJMSMessageID(); 109 synchronized (semaphore) { 110 messageIds.add(id); 111 semaphore.notifyAll(); 112 } 113 if (log.isDebugEnabled()) { 114 log.debug("Received message: " + message); 115 } 116 } catch (JMSException e) { 117 e.printStackTrace(); 118 } 119 if (parent != null) { 120 parent.onMessage(message); 121 } 122 } 123 124 public int getMessageCount() { 125 synchronized (semaphore) { 126 return messageIds.size(); 127 } 128 } 129 130 public void waitForMessagesToArrive(int messageCount) { 131 log.info("Waiting for " + messageCount + " message(s) to arrive"); 132 133 long start = System.currentTimeMillis(); 134 135 for (int i = 0; i < messageCount; i++) { 136 try { 137 if (hasReceivedMessages(messageCount)) { 138 break; 139 } 140 long duration = System.currentTimeMillis() - start; 141 if (duration >= maximumDuration ) { 142 break; 143 } 144 synchronized (semaphore) { 145 semaphore.wait(maximumDuration-duration); 146 } 147 } 148 catch (InterruptedException e) { 149 log.info("Caught: " + e); 150 } 151 } 152 long end = System.currentTimeMillis() - start; 153 154 log.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages"); 155 } 156 157 163 public void assertMessagesReceivedNoWait(int messageCount) { 164 assertEquals("expected number of messages when received", messageCount, getMessageCount()); 165 } 166 167 173 public void assertMessagesReceived(int messageCount) { 174 waitForMessagesToArrive(messageCount); 175 176 assertMessagesReceivedNoWait(messageCount); 177 } 178 179 182 public void assertAtLeastMessagesReceived(int messageCount) { 183 int actual = getMessageCount(); 184 assertTrue("at least: " + messageCount + " messages received. Actual: " + actual, actual >= messageCount); 185 } 186 187 191 public void assertAtMostMessagesReceived(int messageCount) { 192 int actual = getMessageCount(); 193 assertTrue("at most: " + messageCount + " messages received. Actual: " + actual, actual <= messageCount); 194 } 195 196 public boolean hasReceivedMessage() { 197 return getMessageCount() == 0; 198 } 199 200 public boolean hasReceivedMessages(int messageCount) { 201 return getMessageCount() >= messageCount; 202 } 203 204 public boolean isVerbose() { 205 return verbose; 206 } 207 208 public void setVerbose(boolean verbose) { 209 this.verbose = verbose; 210 } 211 212 public MessageListener getParent() { 213 return parent; 214 } 215 216 220 public void setParent(MessageListener parent) { 221 this.parent = parent; 222 } 223 224 225 228 public long getMaximumDuration(){ 229 return this.maximumDuration; 230 } 231 232 233 236 public void setMaximumDuration(long maximumDuration){ 237 this.maximumDuration=maximumDuration; 238 } 239 240 public void setCountDownLatch(CountDownLatch countDownLatch) { 241 this.countDownLatch = countDownLatch; 242 } 243 244 } 245 | Popular Tags |