1 24 package com.scalagent.kjoram; 25 26 import com.scalagent.kjoram.jms.*; 27 import com.scalagent.kjoram.util.TimerTask; 28 29 import java.util.Vector ; 30 31 import com.scalagent.kjoram.excepts.IllegalStateException; 32 import com.scalagent.kjoram.excepts.*; 33 34 35 public class MessageConsumer 36 { 37 38 private String selector; 39 40 private MessageListener messageListener = null; 41 42 private boolean durableSubscriber; 43 44 private AbstractJmsRequest pendingReq = null; 45 49 private boolean receiving = false; 50 51 private TimerTask replyingTask = null; 52 53 54 protected Destination dest; 55 59 protected boolean noLocal; 60 61 protected boolean closed = false; 62 63 64 Session sess; 65 69 String targetName; 70 71 boolean queueMode; 72 73 87 MessageConsumer(Session sess, Destination dest, String selector, 88 String subName, boolean noLocal) throws JMSException 89 { 90 if (dest == null) 91 throw new InvalidDestinationException("Invalid null destination."); 92 93 if (dest instanceof TemporaryQueue) { 94 Connection tempQCnx = ((TemporaryQueue) dest).getCnx(); 95 96 if (tempQCnx == null || ! tempQCnx.equals(sess.cnx)) 97 throw new JMSSecurityException("Forbidden consumer on this " 98 + "temporary destination."); 99 } 100 else if (dest instanceof TemporaryTopic) { 101 Connection tempTCnx = ((TemporaryTopic) dest).getCnx(); 102 103 if (tempTCnx == null || ! tempTCnx.equals(sess.cnx)) 104 throw new JMSSecurityException("Forbidden consumer on this " 105 + "temporary destination."); 106 } 107 108 if (dest instanceof Topic) { 110 if (subName == null) { 111 subName = sess.cnx.nextSubName(); 112 durableSubscriber = false; 113 } 114 else 115 durableSubscriber = true; 116 117 sess.cnx.syncRequest(new ConsumerSubRequest(dest.getName(), 118 subName, 119 selector, 120 noLocal, 121 durableSubscriber)); 122 targetName = subName; 123 this.noLocal = noLocal; 124 queueMode = false; 125 } 126 else { 127 targetName = dest.getName(); 128 queueMode = true; 129 } 130 131 this.sess = sess; 132 this.dest = dest; 133 this.selector = selector; 134 135 sess.consumers.addElement(this); 136 137 if (JoramTracing.dbgClient) 138 JoramTracing.log(JoramTracing.DEBUG, this + ": created."); 139 } 140 141 152 MessageConsumer(Session sess, Destination dest, 153 String selector) throws JMSException 154 { 155 this(sess, dest, selector, null, false); 156 } 157 158 159 public String toString() 160 { 161 return "Consumer:" + sess.ident; 162 } 163 164 181 public void setMessageListener(MessageListener messageListener) 182 throws JMSException 183 { 184 if (closed) 185 throw new IllegalStateException ("Forbidden call on a closed consumer."); 186 187 if (JoramTracing.dbgClient) 188 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 189 + ": setting MessageListener to " 190 + messageListener); 191 192 if (sess.cnx.started && JoramTracing.dbgClient) 193 JoramTracing.log(JoramTracing.WARN, this + ": improper call" 194 + " on a started connection."); 195 196 if (this.messageListener != null && messageListener == null) { 198 if (JoramTracing.dbgClient) 199 JoramTracing.log(JoramTracing.DEBUG, this + ": unsets" 200 + " listener request."); 201 202 sess.cnx.requestsTable.remove(pendingReq.getKey()); 203 204 this.messageListener = messageListener; 205 sess.msgListeners--; 206 207 ConsumerUnsetListRequest unsetLR = null; 208 if (queueMode) { 209 unsetLR = new ConsumerUnsetListRequest(true); 210 unsetLR.setCancelledRequestId(pendingReq.getRequestId()); 211 } 212 else { 213 unsetLR = new ConsumerUnsetListRequest(false); 214 unsetLR.setTarget(targetName); 215 } 216 217 try { 218 sess.cnx.syncRequest(unsetLR); 219 } 220 catch (JMSException jE) {} 222 pendingReq = null; 223 224 if (sess.msgListeners == 0 && sess.started) { 226 if (JoramTracing.dbgClient) 227 JoramTracing.log(JoramTracing.DEBUG, this + ": stops the" 228 + " session daemon."); 229 sess.daemon.stop(); 230 sess.daemon = null; 231 sess.started = false; 232 } 233 } 234 else if (this.messageListener == null && messageListener != null) { 236 sess.msgListeners++; 237 238 if (sess.msgListeners == 1 239 && (sess.started || sess.cnx.started)) { 240 if (JoramTracing.dbgClient) 241 JoramTracing.log(JoramTracing.DEBUG, this + ": starts the" 242 + " session daemon."); 243 sess.daemon = new SessionDaemon(sess); 244 sess.daemon.setDaemon(false); 245 sess.daemon.start(); 246 sess.started = true; 247 } 248 249 this.messageListener = messageListener; 250 pendingReq = new ConsumerSetListRequest(targetName, selector, queueMode); 251 pendingReq.setRequestId(sess.cnx.nextRequestId()); 252 sess.cnx.requestsTable.put(pendingReq.getKey(), this); 253 sess.cnx.asyncRequest(pendingReq); 254 } 255 256 if (JoramTracing.dbgClient) 257 JoramTracing.log(JoramTracing.DEBUG, this + ": MessageListener" 258 + " set."); 259 } 260 261 266 public MessageListener getMessageListener() throws JMSException 267 { 268 if (closed) 269 throw new IllegalStateException ("Forbidden call on a closed consumer."); 270 271 return messageListener; 272 } 273 274 279 public String getMessageSelector() throws JMSException 280 { 281 if (closed) 282 throw new IllegalStateException ("Forbidden call on a closed consumer."); 283 284 return selector; 285 } 286 287 296 public Message receive(long timeOut) throws JMSException 297 { 298 synchronized(this) { 300 if (JoramTracing.dbgClient) 301 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 302 + ": requests to receive a message."); 303 304 if (closed) 305 throw new IllegalStateException ("Forbidden call on a closed consumer."); 306 307 if (messageListener != null) { 308 if (JoramTracing.dbgClient) 309 JoramTracing.log(JoramTracing.WARN, "Improper call as a" 310 + " listener exists for this consumer."); 311 } 312 else if (sess.msgListeners > 0) { 313 if (JoramTracing.dbgClient) 314 JoramTracing.log(JoramTracing.WARN, "Improper call as" 315 + " asynchronous consumers have already" 316 + " been set on the session."); 317 } 318 pendingReq = new ConsumerReceiveRequest(targetName, selector, timeOut, 319 queueMode); 320 pendingReq.setRequestId(sess.cnx.nextRequestId()); 321 receiving = true; 322 323 if (timeOut > 0) { 325 replyingTask = new ConsumerReplyTask(pendingReq); 326 sess.schedule(replyingTask, timeOut); 327 } 328 } 329 330 ConsumerMessages reply = 332 (ConsumerMessages) sess.cnx.syncRequest(pendingReq); 333 334 synchronized(this) { 336 receiving = false; 337 pendingReq = null; 338 if (replyingTask != null) 339 replyingTask.cancel(); 340 if (JoramTracing.dbgClient) 341 JoramTracing.log(JoramTracing.DEBUG, this + ": received a" 342 + " reply."); 343 344 Vector msgs = reply.getMessages(); 345 if (msgs != null && ! msgs.isEmpty()) { 346 com.scalagent.kjoram.messages.Message msg = 347 (com.scalagent.kjoram.messages.Message) msgs.elementAt(0); 348 String msgId = msg.getIdentifier(); 349 if (sess.autoAck) 351 sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId, 352 queueMode)); 353 else 355 sess.prepareAck(targetName, msgId, queueMode); 356 357 return Message.wrapMomMessage(sess, msg); 358 } 359 else 360 return null; 361 } 362 } 363 364 373 public Message receive() throws JMSException 374 { 375 return receive(0); 376 } 377 378 387 public Message receiveNoWait() throws JMSException 388 { 389 return receive(-1); 390 } 391 392 397 public void close() throws JMSException 398 { 399 if (closed) 401 return; 402 403 if (JoramTracing.dbgClient) 404 JoramTracing.log(JoramTracing.DEBUG, "--- " + this 405 + ": closing..."); 406 407 syncro(); 409 410 Object lock = null; 412 if (pendingReq != null) 413 lock = sess.cnx.requestsTable.remove(pendingReq.getKey()); 414 sess.consumers.removeElement(this); 415 416 try { 418 if (messageListener != null) { 419 if (JoramTracing.dbgClient) 420 JoramTracing.log(JoramTracing.DEBUG, "Unsetting listener."); 421 422 if (queueMode) { 423 ConsumerUnsetListRequest unsetLR = 424 new ConsumerUnsetListRequest(true); 425 unsetLR.setCancelledRequestId(pendingReq.getRequestId()); 426 sess.cnx.syncRequest(unsetLR); 427 } 428 } 429 430 if (durableSubscriber) 431 sess.cnx.syncRequest(new ConsumerCloseSubRequest(targetName)); 432 else if (! queueMode) 433 sess.cnx.syncRequest(new ConsumerUnsubRequest(targetName)); 434 } 435 catch (JMSException jE) {} 437 438 if (lock != null && receiving) { 440 if (JoramTracing.dbgClient) 441 JoramTracing.log(JoramTracing.DEBUG, "Replying to the" 442 + " pending receive " 443 + pendingReq.getRequestId() 444 + " with a null message."); 445 446 sess.cnx.repliesTable.put(pendingReq.getKey(), new ConsumerMessages()); 447 448 synchronized(lock) { 449 lock.notify(); 450 } 451 } 452 453 syncro(); 455 456 closed = true; 457 458 if (JoramTracing.dbgClient) 459 JoramTracing.log(JoramTracing.DEBUG, this + ": closed."); 460 } 461 462 466 synchronized void syncro() {} 467 468 472 synchronized void onMessage(com.scalagent.kjoram.messages.Message message) 473 { 474 String msgId = message.getIdentifier(); 475 476 try { 477 if (messageListener == null) { 480 if (JoramTracing.dbgClient) 481 JoramTracing.log(JoramTracing.WARN, this + ": an" 482 + " asynchronous delivery arrived" 483 + " for an improperly unset listener:" 484 + " denying the message."); 485 sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, 486 queueMode, true)); 487 } 488 else { 489 if (! sess.autoAck) 491 sess.prepareAck(targetName, msgId, queueMode); 492 493 try { 494 messageListener.onMessage(Message.wrapMomMessage(sess, message)); 495 if (sess.autoAck) 497 sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId, 498 queueMode)); 499 } 500 catch (JMSException jE) { 503 JoramTracing.log(JoramTracing.ERROR, this 504 + ": error while processing the" 505 + " received message: " + jE); 506 507 if (queueMode) 508 sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, 509 queueMode)); 510 else 511 sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId, 512 queueMode)); 513 } 514 catch (RuntimeException rE) { 517 JoramTracing.log(JoramTracing.ERROR, this 518 + ": RuntimeException thrown" 519 + " by the listener: " + rE); 520 521 if (sess.autoAck && queueMode) 522 sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, 523 queueMode)); 524 else if (sess.autoAck && ! queueMode) 525 sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId, 526 queueMode)); 527 } 528 if (queueMode) { 530 pendingReq = new ConsumerSetListRequest(targetName, selector, true); 531 pendingReq.setRequestId(sess.cnx.nextRequestId()); 532 sess.cnx.requestsTable.put(pendingReq.getKey(), this); 533 sess.cnx.asyncRequest(pendingReq); 534 } 535 } 536 } 537 catch (JMSException jE) { 541 JoramTracing.log(JoramTracing.ERROR, this + ": " + jE); 542 } 543 } 544 545 549 private class ConsumerReplyTask extends TimerTask 550 { 551 552 private AbstractJmsRequest request; 553 554 private ConsumerMessages nullReply; 555 556 561 ConsumerReplyTask(AbstractJmsRequest request) 562 { 563 this.request = request; 564 this.nullReply = new ConsumerMessages(request.getRequestId(), 565 targetName, 566 queueMode); 567 } 568 569 573 public void run() 574 { 575 try { 576 if (JoramTracing.dbgClient) 577 JoramTracing.log(JoramTracing.WARN, "Receive request" + 578 " answered because timer expired"); 579 580 Lock lock = (Lock) sess.cnx.requestsTable.remove(request.getKey()); 581 582 if (lock == null) 583 return; 584 585 synchronized (lock) { 586 sess.cnx.repliesTable.put(request.getKey(), nullReply); 587 lock.notify(); 588 } 589 } 590 catch (Exception e) {} 591 } 592 } 593 } 594 | Popular Tags |