1 18 package org.apache.activemq; 19 20 import java.util.Enumeration ; 21 22 import javax.jms.IllegalStateException ; 23 import javax.jms.JMSException ; 24 import javax.jms.Message ; 25 import javax.jms.Queue ; 26 import javax.jms.QueueBrowser ; 27 28 import org.apache.activemq.command.ActiveMQDestination; 29 import org.apache.activemq.command.ConsumerId; 30 import org.apache.activemq.command.MessageDispatch; 31 32 import java.util.concurrent.atomic.AtomicBoolean ; 33 34 59 60 public class ActiveMQQueueBrowser implements 61 QueueBrowser , Enumeration { 62 63 private final ActiveMQSession session; 64 private final ActiveMQDestination destination; 65 private final String selector; 66 67 private ActiveMQMessageConsumer consumer; 68 private boolean closed; 69 private final ConsumerId consumerId; 70 private final AtomicBoolean browseDone = new AtomicBoolean (true); 71 private final boolean dispatchAsync; 72 private Object semaphore = new Object (); 73 74 82 protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException { 83 this.session = session; 84 this.consumerId = consumerId; 85 this.destination = destination; 86 this.selector = selector; 87 this.dispatchAsync=dispatchAsync; 88 this.consumer = createConsumer(); 89 } 90 91 99 private ActiveMQMessageConsumer createConsumer() throws JMSException { 100 browseDone.set(false); 101 ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy(); 102 return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), 103 prefetchPolicy.getMaximumPendingMessageLimit(), false, true, dispatchAsync) { 104 public void dispatch(MessageDispatch md) { 105 if( md.getMessage()==null ) { 106 browseDone.set(true); 107 } else { 108 super.dispatch(md); 109 } 110 notifyMessageAvailable(); 111 } 112 }; 113 } 114 115 private void destroyConsumer() { 116 if( consumer == null ) 117 return; 118 try { 119 consumer.close(); 120 consumer=null; 121 } catch (JMSException e) { 122 e.printStackTrace(); 123 } 124 } 125 126 134 135 public Enumeration getEnumeration() throws JMSException { 136 checkClosed(); 137 if( consumer==null ) 138 consumer = createConsumer(); 139 return this; 140 } 141 142 private void checkClosed() throws IllegalStateException { 143 if (closed) { 144 throw new IllegalStateException ("The Consumer is closed"); 145 } 146 } 147 148 151 public boolean hasMoreElements() { 152 while( true ) { 153 154 synchronized(this) { 155 if( consumer==null ) 156 return false; 157 } 158 159 if( consumer.getMessageSize() > 0 ) { 160 return true; 161 } 162 163 if( browseDone.get() || !session.isRunning() ) { 164 destroyConsumer(); 165 return false; 166 } 167 168 waitForMessage(); 169 } 170 } 171 172 173 176 public Object nextElement() { 177 while( true ) { 178 179 synchronized(this) { 180 if( consumer==null ) 181 return null; 182 } 183 184 try { 185 Message answer = consumer.receiveNoWait(); 186 if( answer!=null ) 187 return answer; 188 } catch (JMSException e) { 189 this.session.connection.onAsyncException(e); 190 return null; 191 } 192 193 if( browseDone.get() || !session.isRunning() ) { 194 destroyConsumer(); 195 return null; 196 } 197 198 waitForMessage(); 199 } 200 } 201 202 synchronized public void close() throws JMSException { 203 destroyConsumer(); 204 closed=true; 205 } 206 207 214 215 public Queue getQueue() throws JMSException { 216 return (Queue ) destination; 217 } 218 219 220 public String getMessageSelector() throws JMSException { 221 return selector; 222 } 223 224 225 228 231 protected void waitForMessage() { 232 try { 233 synchronized (semaphore) { 234 semaphore.wait(2000); 235 } 236 } 237 catch (InterruptedException e) { 238 Thread.currentThread().interrupt(); 239 } 240 } 241 242 243 protected void notifyMessageAvailable() { 244 synchronized (semaphore ) { 245 semaphore.notifyAll(); 246 } 247 } 248 249 public String toString() { 250 return "ActiveMQQueueBrowser { value=" +consumerId+" }"; 251 } 252 253 } 254 | Popular Tags |