1 18 package org.apache.activemq.tool; 19 20 import java.util.ArrayList ; 21 import java.util.List ; 22 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.MessageListener ; 26 27 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 31 43 public class MemMessageIdList implements MessageListener { 44 45 protected static final Log log = LogFactory.getLog(MemMessageIdList.class); 46 47 private List messageIds = new ArrayList (); 48 private Object semaphore; 49 private boolean verbose; 50 private MessageListener parent; 51 private long maximumDuration = 15000L; 52 53 public MemMessageIdList() { 54 this(new Object ()); 55 } 56 57 public MemMessageIdList(Object semaphore) { 58 this.semaphore = semaphore; 59 } 60 61 public boolean equals(Object that) { 62 if (that instanceof MemMessageIdList) { 63 MemMessageIdList thatListMem = (MemMessageIdList) that; 64 return getMessageIds().equals(thatListMem.getMessageIds()); 65 } 66 return false; 67 } 68 69 public int hashCode() { 70 synchronized (semaphore) { 71 return messageIds.hashCode() + 1; 72 } 73 } 74 75 public String toString() { 76 synchronized (semaphore) { 77 return messageIds.toString(); 78 } 79 } 80 81 84 public List flushMessages() { 85 synchronized (semaphore) { 86 List answer = new ArrayList (messageIds); 87 messageIds.clear(); 88 return answer; 89 } 90 } 91 92 public synchronized List getMessageIds() { 93 synchronized (semaphore) { 94 return new ArrayList (messageIds); 95 } 96 } 97 98 public void onMessage(Message message) { 99 String id = null; 100 try { 101 id = message.getJMSMessageID(); 102 synchronized (semaphore) { 103 messageIds.add(id); 104 semaphore.notifyAll(); 105 } 106 if (verbose) { 107 log.info("Received message: " + message); 108 } 109 } catch (JMSException e) { 110 e.printStackTrace(); 111 } 112 if (parent != null) { 113 parent.onMessage(message); 114 } 115 } 116 117 public int getMessageCount() { 118 synchronized (semaphore) { 119 return messageIds.size(); 120 } 121 } 122 123 public void waitForMessagesToArrive(int messageCount) { 124 log.info("Waiting for " + messageCount + " message(s) to arrive"); 125 126 long start = System.currentTimeMillis(); 127 128 for (int i = 0; i < messageCount; i++) { 129 try { 130 if (hasReceivedMessages(messageCount)) { 131 break; 132 } 133 long duration = System.currentTimeMillis() - start; 134 if (duration >= maximumDuration) { 135 break; 136 } 137 synchronized (semaphore) { 138 semaphore.wait(maximumDuration - duration); 139 } 140 } catch (InterruptedException e) { 141 log.info("Caught: " + e); 142 } 143 } 144 long end = System.currentTimeMillis() - start; 145 146 log.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages"); 147 } 148 149 150 public boolean hasReceivedMessage() { 151 return getMessageCount() == 0; 152 } 153 154 public boolean hasReceivedMessages(int messageCount) { 155 return getMessageCount() >= messageCount; 156 } 157 158 public boolean isVerbose() { 159 return verbose; 160 } 161 162 public void setVerbose(boolean verbose) { 163 this.verbose = verbose; 164 } 165 166 public MessageListener getParent() { 167 return parent; 168 } 169 170 174 public void setParent(MessageListener parent) { 175 this.parent = parent; 176 } 177 178 } 179 | Popular Tags |