1 24 package org.objectweb.joram.client.jms; 25 26 import javax.jms.IllegalStateException ; 27 import javax.jms.InvalidDestinationException ; 28 import javax.jms.InvalidSelectorException ; 29 import javax.jms.JMSException ; 30 import javax.jms.JMSSecurityException ; 31 32 import org.objectweb.joram.shared.client.ConsumerCloseSubRequest; 33 import org.objectweb.joram.shared.client.ConsumerSubRequest; 34 import org.objectweb.joram.shared.client.ConsumerUnsubRequest; 35 36 import org.objectweb.joram.shared.selectors.ClientSelector; 37 38 import org.objectweb.util.monolog.api.BasicLevel; 39 import org.objectweb.joram.shared.JoramTracing; 40 41 44 public class MessageConsumer implements javax.jms.MessageConsumer { 45 48 private static class Status { 49 53 public static final int OPEN = 0; 54 55 59 public static final int CLOSE = 1; 60 61 private static final String [] names = { 62 "OPEN", "CLOSE"}; 63 64 public static String toString(int status) { 65 return names[status]; 66 } 67 } 68 69 70 String selector; 71 72 73 private boolean durableSubscriber; 74 75 76 protected Destination dest; 77 78 82 protected boolean noLocal; 83 84 85 protected Session sess; 86 87 91 String targetName; 92 93 94 boolean queueMode; 95 96 99 private MessageConsumerListener mcl; 100 101 105 private int status; 106 107 111 private Closer closer; 112 113 129 MessageConsumer(Session sess, 130 Destination dest, 131 String selector, 132 String subName, 133 boolean noLocal) throws JMSException { 134 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 135 JoramTracing.dbgClient.log( 136 BasicLevel.DEBUG, 137 "MessageConsumer.<init>(" + 138 sess + ',' + dest + ',' + selector + ',' + 139 subName + ',' + noLocal + ')'); 140 141 if (dest == null) 142 throw new InvalidDestinationException ("Invalid null destination."); 143 144 if (dest instanceof TemporaryQueue) { 145 Connection tempQCnx = ((TemporaryQueue) dest).getCnx(); 146 147 if (tempQCnx == null || ! tempQCnx.equals(sess.getConnection())) 148 throw new JMSSecurityException ("Forbidden consumer on this " 149 + "temporary destination."); 150 } 151 else if (dest instanceof TemporaryTopic) { 152 Connection tempTCnx = ((TemporaryTopic) dest).getCnx(); 153 154 if (tempTCnx == null || ! tempTCnx.equals(sess.getConnection())) 155 throw new JMSSecurityException ("Forbidden consumer on this " 156 + "temporary destination."); 157 } 158 159 try { 160 ClientSelector.checks(selector); 161 } catch (org.objectweb.joram.shared.excepts.SelectorException sE) { 162 throw new InvalidSelectorException ("Invalid selector syntax: " + sE); 163 } 164 165 if (dest instanceof javax.jms.Topic ) { 167 if (subName == null) { 168 subName = sess.getConnection().nextSubName(); 169 durableSubscriber = false; 170 } else { 171 durableSubscriber = true; 172 } 173 sess.syncRequest( 174 new ConsumerSubRequest(dest.getName(), 175 subName, 176 selector, 177 noLocal, 178 durableSubscriber)); 179 targetName = subName; 180 this.noLocal = noLocal; 181 queueMode = false; 182 } else { 183 targetName = dest.getName(); 184 queueMode = true; 185 } 186 187 this.sess = sess; 188 this.dest = dest; 189 this.selector = selector; 190 191 closer = new Closer(); 192 193 setStatus(Status.OPEN); 194 } 195 196 209 MessageConsumer(Session sess, 210 Destination dest, 211 String selector) throws JMSException { 212 this(sess, dest, selector, null, false); 213 } 214 215 private synchronized void setStatus(int status) { 216 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 217 JoramTracing.dbgClient.log( 218 BasicLevel.DEBUG, 219 "MessageConsumer.setStatus(" + Status.toString(status) + ')'); 220 this.status = status; 221 } 222 223 public final String getTargetName() { 224 return targetName; 225 } 226 227 public final boolean getQueueMode() { 228 return queueMode; 229 } 230 231 protected synchronized void checkClosed() 232 throws IllegalStateException { 233 if (status == Status.CLOSE) 234 throw new IllegalStateException ("Forbidden call on a closed consumer."); 235 } 236 237 238 public String toString() { 239 return "Consumer:" + sess.getId(); 240 } 241 242 259 public synchronized void setMessageListener( 260 javax.jms.MessageListener messageListener) 261 throws JMSException { 262 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 263 JoramTracing.dbgClient.log( 264 BasicLevel.DEBUG, 265 "MessageConsumer.setMessageListener(" + 266 messageListener + ')'); 267 checkClosed(); 268 if (mcl != null) { 269 if (messageListener == null) { 270 sess.removeMessageListener(mcl, true); 271 mcl = null; 272 } else throw new IllegalStateException ( 273 "Message listener not null"); 274 } else { 275 if (messageListener != null) { 276 mcl = sess.addMessageListener( 277 new SingleSessionConsumer( 278 queueMode, 279 durableSubscriber, 280 selector, 281 targetName, 282 sess, 283 messageListener, 284 sess.getQueueMessageReadMax(), 285 sess.getTopicActivationThreshold(), 286 sess.getTopicPassivationThreshold(), 287 sess.getTopicAckBufferMax(), 288 sess.getRequestMultiplexer())); 289 } 290 } 292 } 293 294 300 public synchronized javax.jms.MessageListener getMessageListener() 301 throws JMSException { 302 checkClosed(); 303 if (mcl == null) return null; 304 return mcl.getMessageListener(); 305 } 306 307 313 public final String getMessageSelector() 314 throws JMSException { 315 checkClosed(); 316 return selector; 317 } 318 319 329 public javax.jms.Message receive(long timeOut) 330 throws JMSException { 331 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 332 JoramTracing.dbgClient.log( 333 BasicLevel.DEBUG, 334 "MessageConsumer.receive(" + timeOut + ')'); 335 checkClosed(); 336 return sess.receive(timeOut, timeOut, this, 337 targetName, selector, queueMode); 338 } 339 340 350 public javax.jms.Message receive() 351 throws JMSException { 352 return receive(0); 353 } 354 355 365 public javax.jms.Message receiveNoWait() 366 throws JMSException { 367 checkClosed(); 368 if (sess.getConnection().isStopped()) { 369 return null; 370 } else { 371 return sess.receive(-1, 0, this, 372 targetName, selector, queueMode); 373 } 374 } 375 376 381 public void close() throws JMSException { 382 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 383 JoramTracing.dbgClient.log( 384 BasicLevel.DEBUG, 385 "MessageConsumer.close()"); 386 closer.close(); 387 } 388 389 396 class Closer { 397 synchronized void close() throws JMSException { 398 doClose(); 399 } 400 } 401 402 void doClose() throws JMSException { 403 synchronized (this) { 404 if (status == Status.CLOSE) 405 return; 406 setStatus(Status.CLOSE); 411 } 412 413 if (!queueMode) { 414 if (durableSubscriber) { 416 try { 417 sess.syncRequest(new ConsumerCloseSubRequest(targetName)); 418 } catch (JMSException exc) { 419 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 420 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc); 421 } 422 } else { 423 try { 424 sess.syncRequest(new ConsumerUnsubRequest(targetName)); 425 } catch (JMSException exc) { 426 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 427 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc); 428 } 429 } 430 } 431 432 sess.closeConsumer(this); 433 434 if (mcl != null) { 435 sess.removeMessageListener(mcl, false); 437 } 438 } 439 440 void activateMessageInput() throws JMSException { 441 if (mcl != null) 442 mcl.activateMessageInput(); 443 } 444 445 void passivateMessageInput() throws JMSException { 446 if (mcl != null) 447 mcl.passivateMessageInput(); 448 } 449 } 450 | Popular Tags |