1 21 package com.presumo.jms.client; 22 23 import com.presumo.util.log.Logger; 24 import com.presumo.util.log.LoggerFactory; 25 import com.presumo.jms.selector.JmsOperand; 26 import com.presumo.jms.selector.Parser; 27 import com.presumo.jms.resources.Resources; 28 29 import java.util.LinkedList ; 30 31 import javax.jms.IllegalStateException ; 32 import javax.jms.InvalidSelectorException ; 33 import javax.jms.JMSException ; 34 import javax.jms.Message ; 35 import javax.jms.MessageConsumer ; 36 import javax.jms.MessageListener ; 37 import javax.jms.Session ; 38 39 47 public abstract class JmsMessageConsumer implements MessageConsumer 48 { 49 50 protected String selector; 54 protected final JmsSession mySession; 55 protected MessageListener messageListener; 56 protected LinkedList inbox; 57 protected volatile boolean closed; 58 59 63 67 private final Object lock = new String ("JmsMessageConsumer Lock"); 68 69 private int consumerID; 70 71 JmsMessageConsumer(JmsSession session, String selector) 75 throws JMSException 76 { 77 logger.entry("JmsMessageConsumer"); 78 79 checkSelector(selector); 80 this.mySession = session; 81 this.selector = selector; 82 this.inbox = new LinkedList (); 83 84 85 88 logger.exit("JmsMessageConsumer"); 89 } 90 91 92 93 97 98 101 public final String getMessageSelector() throws JMSException 102 { 103 return this.selector; 104 } 105 106 107 110 public final MessageListener getMessageListener() 111 { 112 return this.messageListener; 113 } 114 115 116 119 public final void setMessageListener(MessageListener listener) 120 throws JMSException 121 { 122 logger.entry("setMessageListener"); 123 124 if (listener == null) 125 throw new IllegalArgumentException ("Attempt to register null MessageListener"); 126 127 if (messageListener == null && listener != null) { 130 mySession.addAsynch(); 131 } 132 133 this.messageListener = listener; 134 135 logger.exit("setMessageListener"); 136 } 137 138 139 142 public final Message receive() throws JMSException 143 { 144 logger.entry("receive()"); 145 146 if (closed == true) 147 throw new IllegalStateException ("receive called on a closed consumer"); 148 149 if (messageListener != null || mySession.hasAsynchronousListeners()) 150 throw new IllegalStateException 151 ("receive called on a Session with an asynchronous listener"); 152 153 Message message = null; 154 155 synchronized (lock) { 156 157 while (message == null && closed == false) { 158 if (inbox.size() != 0) { 159 message = (Message ) inbox.removeFirst(); 160 } 161 else { 162 try { 163 lock.wait(); 164 } catch (InterruptedException ie) {} 165 } 166 } 167 } 168 169 170 if (mySession.autoAcknowledge()) { 171 mySession.acknowledge(); 172 } 173 174 logger.exit("receive()"); 175 return message; 176 } 177 178 179 182 public final Message receive(long timeout) throws JMSException 183 { 184 logger.entry("receive(long)"); 185 186 if (closed == true) 187 throw new IllegalStateException ("receive called on a closed consumer"); 188 189 if (messageListener != null || mySession.hasAsynchronousListeners()) 190 throw new IllegalStateException 191 ("receive called on a Session with an asynchronous listener"); 192 193 Message message = null; 194 195 synchronized (lock) { 196 197 if (inbox.size() != 0) 198 message = (Message ) inbox.removeFirst(); 199 else { 200 try { 201 lock.wait(timeout); 202 } catch (InterruptedException ie) {} 203 204 if (inbox.size() != 0) 205 message = (Message ) inbox.removeFirst(); 206 } 207 } 208 209 210 if (mySession.autoAcknowledge()) { 211 mySession.acknowledge(); 212 } 213 214 logger.exit("receive(long)", message); 215 return message; 216 } 217 218 219 222 public final Message receiveNoWait() throws JMSException 223 { 224 return this.receive(1); 227 } 228 229 230 233 public final void close() throws JMSException 234 { 235 logger.entry("close"); 236 237 if (!closed) { 238 synchronized (lock) { 240 closed = true; 241 242 mySession.removeConsumer(this); 243 if (messageListener != null) { 244 mySession.removeAsynch(); 245 messageListener = null; 246 } 247 else { 248 lock.notifyAll(); 249 } 250 } 251 } 252 253 logger.exit("close"); 254 } 255 256 257 262 public final int getConsumerID() 263 { 264 return this.consumerID; 265 } 266 267 272 public final void setConsumerID(int id) 273 { 274 this.consumerID = id; 275 } 276 277 abstract JmsOperand getFilter(); 281 282 285 final void takeMessage(Message message) 286 { 287 logger.entry("consumer-takeMessage", message); 288 289 synchronized (lock) { 290 291 if (closed) return; 292 293 if (messageListener == null) { 294 inbox.add(message); 295 lock.notifyAll(); 296 } 297 else { 298 299 if (inbox != null) { 304 while (inbox.size() > 0 ) { 305 try { 306 messageListener.onMessage((Message )inbox.removeFirst()); 307 308 if (mySession.autoAcknowledge()) { 309 mySession.acknowledge(); 310 } 311 } catch (RuntimeException e) { mySession.reportException(e); } 312 } 313 inbox = null; 314 } 315 316 try { 318 messageListener.onMessage(message); 319 320 if (mySession.autoAcknowledge()) { 321 mySession.acknowledge(); 322 } 323 } catch (RuntimeException e) { mySession.reportException(e); } 324 } 325 } 326 327 logger.exit("consumer-takeMessage"); 328 } 329 333 337 protected final String createUniqueID() 338 { 339 throw new RuntimeException ("Not implemented"); 340 } 341 342 346 protected final String generateSystemFilter(String queueName, String id) 347 { 348 StringBuffer buffer = new StringBuffer (); 349 buffer.append("QueueName = \'"); 350 buffer.append(queueName); 351 buffer.append("\' AND ReceiverID = \'"); 352 buffer.append(id); 353 buffer.append('\''); 354 return buffer.toString(); 355 } 356 357 358 359 363 366 private void checkSelector(String selector) 367 throws InvalidSelectorException 368 { 369 if (selector != null && selector.length() != 0) { 370 Parser parser = Parser.getInstance(); 371 JmsOperand o = parser.parseFilter(selector); 372 parser.delete(o); 373 } 374 } 375 376 378 private static Logger logger = 379 LoggerFactory.getLogger(JmsMessageConsumer.class, Resources.getBundle()); 380 381 383 } 384 385 386 387 388 389 | Popular Tags |