1 23 package org.objectweb.joram.client.jms; 24 25 import java.util.Vector ; 26 27 import javax.jms.MessageListener ; 28 import javax.jms.JMSException ; 29 30 import org.objectweb.joram.shared.client.AbstractJmsReply; 31 import org.objectweb.joram.shared.client.ConsumerCloseSubRequest; 32 import org.objectweb.joram.shared.client.ConsumerMessages; 33 import org.objectweb.joram.shared.client.ConsumerSetListRequest; 34 import org.objectweb.joram.shared.client.ConsumerUnsetListRequest; 35 import org.objectweb.joram.shared.client.ConsumerAckRequest; 36 import org.objectweb.joram.shared.client.ActivateConsumerRequest; 37 import org.objectweb.joram.shared.client.ConsumerUnsubRequest; 38 import org.objectweb.joram.client.jms.connection.ReplyListener; 39 import org.objectweb.joram.client.jms.connection.AbortedRequestException; 40 import org.objectweb.joram.client.jms.connection.RequestMultiplexer; 41 42 import fr.dyade.aaa.util.StoppedQueueException; 43 44 import org.objectweb.util.monolog.api.BasicLevel; 45 import org.objectweb.util.monolog.api.Logger; 46 import fr.dyade.aaa.util.Debug; 47 48 52 abstract class MessageConsumerListener implements ReplyListener { 53 54 public static Logger logger = 55 Debug.getLogger(MessageConsumerListener.class.getName()); 56 57 60 protected static class Status { 61 public static final int INIT = 0; 62 public static final int RUN = 1; 63 public static final int ON_MSG = 2; 64 public static final int CLOSE = 3; 65 66 private static final String [] names = { 67 "INIT", "RUN", "ON_MSG", "CLOSE"}; 68 69 public static String toString(int status) { 70 return names[status]; 71 } 72 } 73 74 private static class ReceiveStatus { 75 public static final int INIT = 0; 76 77 public static final int WAIT_FOR_REPLY = 1; 78 79 public static final int CONSUMING_REPLY = 2; 80 81 private static final String [] names = { 82 "INIT", "WAIT_FOR_REPLY", "CONSUMING_REPLY" }; 83 84 public static String toString(int status) { 85 return names[status]; 86 } 87 } 88 89 private boolean queueMode; 90 91 private boolean durable; 92 93 private String selector; 94 95 private String targetName; 96 97 100 private volatile int requestId; 101 102 private int status; 103 104 private Vector messagesToAck; 105 106 110 private volatile int messageCount; 111 112 118 private volatile int receiveStatus; 119 120 123 private boolean topicMsgInputPassivated; 124 125 private int queueMessageReadMax; 126 127 private RequestMultiplexer rm; 128 129 private int topicActivationThreshold; 130 131 private int topicPassivationThreshold; 132 133 private int topicAckBufferMax; 134 135 private MessageListener listener; 136 137 MessageConsumerListener(boolean queueMode, 138 boolean durable, 139 String selector, 140 String targetName, 141 MessageListener listener, 142 int queueMessageReadMax, 143 int topicActivationThreshold, 144 int topicPassivationThreshold, 145 int topicAckBufferMax, 146 RequestMultiplexer reqMultiplexer) { 147 if (logger.isLoggable(BasicLevel.DEBUG)) 148 logger.log(BasicLevel.DEBUG, 149 "MessageConsumerListener(" + queueMode + 150 ',' + durable + ',' + selector + ',' + targetName + 151 ',' + listener + ',' + queueMessageReadMax + 152 ',' + topicActivationThreshold + 153 ',' + topicPassivationThreshold + 154 ',' + topicAckBufferMax + ',' + reqMultiplexer + ')'); 155 this.queueMode = queueMode; 156 this.durable = durable; 157 this.selector = selector; 158 this.targetName = targetName; 159 this.listener = listener; 160 this.queueMessageReadMax = queueMessageReadMax; 161 this.topicActivationThreshold = topicActivationThreshold; 162 this.topicPassivationThreshold = topicPassivationThreshold; 163 this.topicAckBufferMax = topicAckBufferMax; 164 rm = reqMultiplexer; 165 messagesToAck = new Vector (0); 166 requestId = -1; 167 messageCount = 0; 168 topicMsgInputPassivated = false; 169 setStatus(Status.INIT); 170 setReceiveStatus(ReceiveStatus.INIT); 171 } 172 173 protected final int getStatus() { 174 return status; 175 } 176 177 protected void setStatus(int status) { 178 if (logger.isLoggable(BasicLevel.DEBUG)) 179 logger.log(BasicLevel.DEBUG, 180 "MessageConsumerListener.setStatus(" + Status.toString(status) + ')'); 181 this.status = status; 182 } 183 184 private void setReceiveStatus(int s) { 185 if (logger.isLoggable(BasicLevel.DEBUG)) 186 logger.log(BasicLevel.DEBUG, 187 "MessageConsumerListener.setReceiveStatus(" + ReceiveStatus.toString(s) + ')'); 188 receiveStatus = s; 189 } 190 191 198 private int decreaseMessageCount(int ackMode) throws JMSException { 199 if (logger.isLoggable(BasicLevel.DEBUG)) 200 logger.log(BasicLevel.DEBUG, 201 "MessageConsumerListener.decreaseMessageCount()"); 202 203 synchronized (this) { 204 messageCount--; 205 } 206 207 if (queueMode) { 208 boolean subscribe = false; 209 String [] toAck = null; 210 synchronized (this) { 211 if (logger.isLoggable(BasicLevel.DEBUG)) 212 logger.log(BasicLevel.DEBUG, " -> messageCount = " + messageCount); 213 if (messageCount < queueMessageReadMax 215 && receiveStatus == ReceiveStatus.CONSUMING_REPLY) { 216 subscribe = true; 217 if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE) { 218 synchronized (messagesToAck) { 219 if (messagesToAck.size() > 0) { 220 toAck = new String [messagesToAck.size()]; 221 messagesToAck.copyInto(toAck); 222 messagesToAck.clear(); 223 } 224 } 225 } 226 } 227 } 228 if (subscribe) { 229 subscribe(toAck); 231 } 232 } else { 233 synchronized (this) { 234 if (topicMsgInputPassivated) { 235 if (messageCount < topicActivationThreshold) { 236 activateMessageInput(); 237 topicMsgInputPassivated = false; 238 } 239 } else { 240 if (messageCount > topicPassivationThreshold) { 241 passivateMessageInput(); 242 topicMsgInputPassivated = true; 243 } 244 } 245 } 246 } 247 248 if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE 249 && messageCount == 0) { 250 acknowledge(0); 253 } 254 255 return messageCount; 256 } 257 258 261 synchronized void start() throws JMSException { 262 if (logger.isLoggable(BasicLevel.DEBUG)) 263 logger.log( 264 BasicLevel.DEBUG, "MessageConsumerListener.start()"); 265 if (status == Status.INIT) { 266 subscribe(null); 267 setStatus(Status.RUN); 268 } else { 269 throw new IllegalStateException ("Status error"); 271 } 272 } 273 274 private void subscribe(String [] toAck) throws JMSException { 275 if (logger.isLoggable(BasicLevel.DEBUG)) 276 logger.log( 277 BasicLevel.DEBUG, "MessageConsumerListener.subscribe()"); 278 279 ConsumerSetListRequest req = 280 new ConsumerSetListRequest( 281 targetName, 282 selector, 283 queueMode, 284 toAck, 285 queueMessageReadMax); 286 287 setReceiveStatus(ReceiveStatus.WAIT_FOR_REPLY); 292 rm.sendRequest(req, this); 293 requestId = req.getRequestId(); 294 } 295 296 299 public void close() throws JMSException { 300 if (logger.isLoggable(BasicLevel.DEBUG)) 301 logger.log( 302 BasicLevel.DEBUG, "MessageConsumerListener.close()"); 303 304 synchronized (this) { 305 while (status == Status.ON_MSG) { 306 try { 307 wait(); 310 } catch (InterruptedException exc) {} 311 } 312 313 if (status == Status.INIT || 314 status == Status.CLOSE) return; 315 316 rm.abortRequest(requestId); 317 318 acknowledge(0); 320 321 setStatus(Status.CLOSE); 322 } 323 324 if (queueMode) { 325 ConsumerUnsetListRequest unsetLR = new ConsumerUnsetListRequest( 329 queueMode); 330 unsetLR.setTarget(targetName); 331 unsetLR.setCancelledRequestId(requestId); 332 rm.sendRequest(unsetLR); 333 } 334 } 338 339 private void acknowledge(int threshold) { 340 try { 341 synchronized (messagesToAck) { 342 if (messagesToAck.size() > threshold) { 343 ConsumerAckRequest ack = new ConsumerAckRequest( 344 targetName, 345 queueMode); 346 for (int i = 0; i < messagesToAck.size(); i++) { 347 String msgId = (String ) messagesToAck.elementAt(i); 348 ack.addId(msgId); 349 } 350 rm.sendRequest(ack); 351 messagesToAck.clear(); 352 } 353 } 354 } catch (JMSException exc) { 355 if (logger.isLoggable(BasicLevel.ERROR)) 356 logger.log( 357 BasicLevel.ERROR, "", exc); 358 } 359 } 360 361 364 public synchronized boolean replyReceived(AbstractJmsReply reply) 365 throws AbortedRequestException { 366 if (logger.isLoggable(BasicLevel.DEBUG)) 367 logger.log( 368 BasicLevel.DEBUG, "MessageConsumerListener.replyReceived(" + 369 reply + ')'); 370 371 if (status == Status.CLOSE) { 372 throw new AbortedRequestException(); 373 } else { 374 if (queueMode) { 375 setReceiveStatus(ReceiveStatus.CONSUMING_REPLY); 378 } 379 try { 380 ConsumerMessages cm = (ConsumerMessages)reply; 381 messageCount += cm.getMessageCount(); 383 384 pushMessages(cm); 385 } catch (StoppedQueueException exc) { 386 throw new AbortedRequestException(); 387 } catch (JMSException exc) { 388 throw new AbortedRequestException(); 389 } 390 if (queueMode) { 391 return true; 392 } else { 393 return false; 394 } 395 } 396 } 397 398 408 public abstract void pushMessages(ConsumerMessages cm) throws JMSException ; 409 410 public void replyAborted(int requestId) { 411 } 413 414 public synchronized boolean isClosed() { 415 return (status == Status.CLOSE); 416 } 417 418 public final MessageListener getMessageListener() { 419 return listener; 420 } 421 422 public final boolean getQueueMode() { 423 return queueMode; 424 } 425 426 public final String getTargetName() { 427 return targetName; 428 } 429 430 431 protected void activateListener( 432 Message msg, MessageListener listener, int ackMode) 433 throws JMSException { 434 if (logger.isLoggable(BasicLevel.DEBUG)) 435 logger.log(BasicLevel.DEBUG, 436 "MessageConsumerListener.onMessage(" + msg + ')'); 437 438 decreaseMessageCount(ackMode); 440 441 try { 442 listener.onMessage(msg); 443 444 if (logger.isLoggable(BasicLevel.DEBUG)) 445 logger.log(BasicLevel.DEBUG, 446 " -> consumer.onMessage(" + msg + ") returned"); 447 } catch (RuntimeException re) { 448 if (logger.isLoggable(BasicLevel.DEBUG)) 449 logger.log(BasicLevel.DEBUG, "", re); 450 JMSException exc = new JMSException (re.toString()); 451 exc.setLinkedException(re); 452 throw exc; 453 } 454 } 455 456 public abstract void onMessage( 457 Message msg, MessageListener listener, int ackMode) 458 throws JMSException ; 459 460 463 public void onMessage(Message msg, int ackMode) throws JMSException { 464 if (logger.isLoggable(BasicLevel.DEBUG)) 465 logger.log(BasicLevel.DEBUG, "MessageConsumerListener.onMessage(" + msg + ')'); 466 if (listener != null) { 467 synchronized (this) { 468 if (status == Status.RUN) { 469 setStatus(Status.ON_MSG); 470 } else { 471 notifyAll(); 473 throw new javax.jms.IllegalStateException ("Message listener closed"); 474 } 475 } 476 477 try { 478 activateListener(msg, listener, ackMode); 479 } finally { 480 synchronized (this) { 481 if (status == Status.ON_MSG) 482 setStatus(Status.RUN); 483 484 notifyAll(); 486 } 487 } 488 } else { 489 throw new JMSException ("Null listener"); 490 } 491 } 492 493 void ack(String msgId, int ackMode) 494 throws JMSException { 495 if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE) { 496 messagesToAck.addElement(msgId); 499 if (! queueMode) { 500 acknowledge(topicAckBufferMax); 501 } 502 } else { 503 ConsumerAckRequest ack = new ConsumerAckRequest(targetName, queueMode); 504 ack.addId(msgId); 505 rm.sendRequest(ack); 506 } 507 } 508 509 void activateMessageInput() throws JMSException { 510 rm.sendRequest( 511 new ActivateConsumerRequest(targetName, true)); 512 } 513 514 void passivateMessageInput() throws JMSException { 515 rm.sendRequest( 516 new ActivateConsumerRequest(targetName, false)); 517 } 518 } 519 | Popular Tags |