1 18 package org.apache.activemq.spring; 19 20 import javax.jms.Message ; 21 import javax.jms.MessageListener ; 22 import java.util.ArrayList ; 23 import java.util.List ; 24 25 import junit.framework.Assert; 26 27 public class ConsumerBean extends Assert implements MessageListener { 28 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 29 .getLog(ConsumerBean.class); 30 31 private List messages = new ArrayList (); 32 private Object semaphore; 33 private boolean verbose; 34 35 38 public ConsumerBean() { 39 this(new Object ()); 40 } 41 42 47 public ConsumerBean(Object semaphore) { 48 this.semaphore = semaphore; 49 } 50 51 54 public synchronized List flushMessages() { 55 List answer = new ArrayList (messages); 56 messages.clear(); 57 return answer; 58 } 59 60 65 public synchronized void onMessage(Message message) { 66 messages.add(message); 67 if (verbose) { 68 log.info("Received: " + message); 69 } 70 synchronized (semaphore) { 71 semaphore.notifyAll(); 72 } 73 } 74 75 78 public void waitForMessageToArrive() { 79 log.info("Waiting for message to arrive"); 80 81 long start = System.currentTimeMillis(); 82 83 try { 84 if (hasReceivedMessage()) { 85 synchronized (semaphore) { 86 semaphore.wait(4000); 87 } 88 } 89 } 90 catch (InterruptedException e) { 91 log.info("Caught: " + e); 92 } 93 long end = System.currentTimeMillis() - start; 94 95 log.info("End of wait for " + end + " millis"); 96 } 97 98 103 public void waitForMessagesToArrive(int messageCount) { 104 log.info("Waiting for message to arrive"); 105 106 long start = System.currentTimeMillis(); 107 108 for (int i = 0; i < 10; i++) { 109 try { 110 if (hasReceivedMessages(messageCount)) { 111 break; 112 } 113 synchronized (semaphore) { 114 semaphore.wait(1000); 115 } 116 } 117 catch (InterruptedException e) { 118 log.info("Caught: " + e); 119 } 120 } 121 long end = System.currentTimeMillis() - start; 122 123 log.info("End of wait for " + end + " millis"); 124 } 125 126 public void assertMessagesArrived(int total) { 127 waitForMessagesToArrive(total); 128 synchronized (this) { 129 int count = messages.size(); 130 131 assertEquals("Messages received", total, count); 132 } 133 } 134 135 public boolean isVerbose() { 136 return verbose; 137 } 138 139 public void setVerbose(boolean verbose) { 140 this.verbose = verbose; 141 } 142 143 148 protected boolean hasReceivedMessage() { 149 return messages.isEmpty(); 150 } 151 152 158 protected synchronized boolean hasReceivedMessages(int messageCount) { 159 return messages.size() >= messageCount; 160 } 161 } 162 | Popular Tags |