1 17 package org.apache.servicemix.tck; 18 19 import javax.jbi.messaging.MessageExchange; 20 import javax.jbi.messaging.MessagingException; 21 import javax.jbi.messaging.NormalizedMessage; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.apache.servicemix.client.Message; 26 import org.apache.servicemix.client.MessageListener; 27 28 import java.util.ArrayList ; 29 import java.util.List ; 30 31 import junit.framework.Assert; 32 33 38 public class MessageList extends Assert implements MessageListener { 39 40 private static final Log log = LogFactory.getLog(MessageList.class); 41 42 private List messages = new ArrayList (); 43 44 private Object semaphore; 45 46 public MessageList() { 47 this(new Object ()); 48 } 49 50 public MessageList(Object semaphore) { 51 this.semaphore = semaphore; 52 } 53 54 57 public List flushMessages() { 58 synchronized (semaphore) { 59 List answer = new ArrayList (messages); 60 messages.clear(); 61 return answer; 62 } 63 } 64 65 public synchronized List getMessages() { 66 synchronized (semaphore) { 67 return new ArrayList (messages); 68 } 69 } 70 71 public void addMessage(NormalizedMessage message) throws MessagingException { 72 synchronized (semaphore) { 73 messages.add(message); 74 semaphore.notifyAll(); 75 } 76 } 77 78 public void addMessage(String message) throws MessagingException { 79 synchronized (semaphore) { 80 messages.add(message); 81 semaphore.notifyAll(); 82 } 83 } 84 85 public int getMessageCount() { 86 synchronized (semaphore) { 87 return messages.size(); 88 } 89 } 90 91 public void waitForMessagesToArrive(int messageCount) { 92 waitForMessagesToArrive(messageCount, 4000); 93 } 94 95 public void waitForMessagesToArrive(int messageCount, long baseTimeout) { 96 log.info("Waiting for message to arrive"); 97 98 long start = System.currentTimeMillis(); 99 100 while (System.currentTimeMillis() - start < baseTimeout + 100 * messageCount) { 101 try { 102 if (hasReceivedMessages(messageCount)) { 103 break; 104 } 105 synchronized (semaphore) { 106 semaphore.wait(4000); 107 } 108 } 109 catch (InterruptedException e) { 110 log.info("Caught: " + e); 111 } 112 } 113 long end = System.currentTimeMillis() - start; 114 115 log.info("End of wait for " + end + " millis"); 116 } 117 118 119 124 public void assertMessagesReceived(int messageCount) { 125 waitForMessagesToArrive(messageCount); 126 127 assertEquals("expected number of messages when received: " + getMessages(), messageCount, getMessageCount()); 128 } 129 130 public boolean hasReceivedMessage() { 131 return getMessageCount() > 0; 132 } 133 134 public boolean hasReceivedMessages(int messageCount) { 135 return getMessageCount() >= messageCount; 136 } 137 138 public void onMessage(MessageExchange exchange, Message message) throws Exception { 141 addMessage(message); 142 } 143 } 144 | Popular Tags |