1 18 package org.apache.activemq.systest.impl; 19 20 import java.util.concurrent.CopyOnWriteArrayList ; 21 22 import javax.jms.Message ; 23 import javax.jms.MessageListener ; 24 25 import java.util.ArrayList ; 26 import java.util.List ; 27 28 35 public class AgentMessageListener implements MessageListener { 36 private List messages = new CopyOnWriteArrayList (); 37 private Object semaphore = new Object ();; 38 39 public void stop() { 40 messages.clear(); 41 } 42 43 46 public List flushMessages() { 47 List answer = new ArrayList (messages); 48 messages.clear(); 49 return answer; 50 } 51 52 public void onMessage(Message message) { 53 System.out.println("Received message: " + message); 54 55 messages.add(message); 56 57 synchronized (semaphore) { 58 semaphore.notifyAll(); 59 } 60 } 61 62 public void waitForMessageToArrive() { 63 System.out.println("Waiting for message to arrive"); 64 65 long start = System.currentTimeMillis(); 66 67 try { 68 if (hasReceivedMessage()) { 69 synchronized (semaphore) { 70 semaphore.wait(4000); 71 } 72 } 73 } 74 catch (InterruptedException e) { 75 System.out.println("Caught: " + e); 76 } 77 long end = System.currentTimeMillis() - start; 78 79 System.out.println("End of wait for " + end + " millis"); 80 } 81 82 public void waitForMessagesToArrive(int messageCount) { 83 System.out.println("Waiting for message to arrive"); 84 85 long start = System.currentTimeMillis(); 86 87 for (int i = 0; i < 10; i++) { 88 try { 89 if (hasReceivedMessages(messageCount)) { 90 break; 91 } 92 synchronized (semaphore) { 93 semaphore.wait(1000); 94 } 95 } 96 catch (InterruptedException e) { 97 System.out.println("Caught: " + e); 98 } 99 } 100 long end = System.currentTimeMillis() - start; 101 102 System.out.println("End of wait for " + end + " millis"); 103 } 104 105 protected boolean hasReceivedMessage() { 106 return messages.isEmpty(); 107 } 108 109 protected boolean hasReceivedMessages(int messageCount) { 110 return messages.size() >= messageCount; 111 } 112 113 } 114 | Popular Tags |