1 46 package org.mr.api.jms; 47 48 import java.io.Serializable ; 49 import java.util.Iterator ; 50 import java.util.List ; 51 52 import javax.jms.Destination ; 53 import javax.jms.IllegalStateException ; 54 import javax.jms.JMSException ; 55 import javax.jms.Message ; 56 import javax.jms.MessageConsumer ; 57 import javax.jms.MessageListener ; 58 import javax.jms.Queue ; 59 import javax.jms.QueueReceiver ; 60 import javax.jms.Session ; 61 import javax.jms.Topic ; 62 import javax.jms.TopicSubscriber ; 63 64 import org.apache.commons.logging.Log; 65 import org.apache.commons.logging.LogFactory; 66 import org.mr.MantaException; 67 import org.mr.core.protocol.MantaBusMessage; 68 import org.mr.core.protocol.MantaBusMessageConsts; 69 import org.mr.core.util.SynchronizedPriorityQueue; 70 import org.mr.core.util.SynchronizedQueue; 71 import org.mr.core.util.SystemTime; 72 import org.mr.kernel.services.ServiceConsumer; 73 74 85 public class MantaMessageConsumer implements MessageConsumer , QueueReceiver , 86 TopicSubscriber , Serializable { 87 88 98 public MantaMessageConsumer(String clientID, MantaSession sess, 99 Destination destination, String messageSelector, boolean noLocal, 100 ServiceConsumer service) throws JMSException { 101 102 if (sess == null) 103 throw new JMSException ("MNJMS00060 : CAN NOT CREATE A CONSUMER. SESSION IS NULL."); 104 105 if (messageSelector == null || messageSelector.length()==0) 106 messageSelector = null; 107 108 111 theMessageSelector = messageSelector; 112 113 isClosed = false; 114 theDestination = destination; 115 this.clientID = clientID; 116 this.creatingSession = sess; 117 isNoLocal = noLocal; 118 theService = service; 119 innerQueue = new SynchronizedPriorityQueue(10); 121 log = LogFactory.getLog("MantaMessageConsumer"); 122 } 124 134 public synchronized void close() throws JMSException { 135 136 if (isClosed) 137 return; 138 139 isClosed = true; 141 142 147 if (creatingSession != null) 150 creatingSession.removeConsumer(this); 151 152 creatingSession = null; 153 theMessageSelector = null; 154 theService = null; 155 clientID = null; 156 this.messageListener = null; 157 } 158 159 169 public MessageListener getMessageListener() throws JMSException { 170 checkLegalOperation(); 171 return this.messageListener; 172 } 174 185 public String getMessageSelector() throws JMSException { 186 checkLegalOperation(); 187 return theMessageSelector; 188 } 190 206 public Message receive() throws JMSException { 207 208 return receive(0); 209 } 211 229 public Message receive(long timeout) throws JMSException { 230 231 checkLegalOperation(); 232 if (timeout < 0) 233 return null; 234 235 if (timeout == 0) 236 timeout = Long.MAX_VALUE; 237 238 if (creatingSession.isStopped) { 239 long startTime = System.currentTimeMillis(); 240 try { 241 synchronized (creatingSession.lockMonitor) { 242 creatingSession.lockMonitor.wait(timeout); 243 } 245 246 } catch (InterruptedException e) { 247 if (log.isErrorEnabled()) 248 log.error("Error while waiting for the session to resume. ", e); 249 } 250 timeout = timeout - (System.currentTimeMillis() - startTime); 254 if (timeout < 1000) return null; 256 } 257 258 259 260 261 boolean ackOrHold = true; 262 MantaBusMessage cbm = null; 263 MantaMessage jmsMessage = null; 264 if (this.theDestination instanceof Topic ) { 267 268 boolean goOn = true; 269 270 282 while (goOn && timeout >= 0) { 286 goOn = false; 287 long start = SystemTime.currentTimeMillis(); 288 cbm = (MantaBusMessage) innerQueue.dequeue(timeout); 289 if (cbm != null && 290 cbm.getValidUntil() < SystemTime.gmtCurrentTimeMillis()) { 291 goOn = true; 292 continue; 293 } 294 if (cbm != null) { 295 jmsMessage = convertToJMSMessage(cbm,creatingSession); 296 if (isBreakingNoLocal(cbm, jmsMessage)) { 297 goOn = true; 298 long now = SystemTime.currentTimeMillis(); 299 timeout = timeout - (now - start); 301 ackOrHold=false; 302 } 303 else 304 ackOrHold=true; 305 } 307 } 309 } else { try { 311 313 synchronized (innerQueue) { 314 if (innerQueue.size() > 0) 315 cbm = (MantaBusMessage) innerQueue.dequeue(timeout); 316 } 317 320 if (cbm == null){ 321 cbm = creatingSession.receive(this.getService(), timeout); 323 } 324 else { 325 creatingSession.startLocalTransactionIfNeeded(); 326 } 327 328 if (cbm != null) { 330 jmsMessage = convertToJMSMessage(cbm, creatingSession); 331 if (jmsMessage == null) { 332 ackOrHold = false; 334 } 335 } 336 } catch (MantaException me) { 337 throw new JMSException ("MNJMS00062 : METHOD receive() FAILED INTERNALLY. ERROR TEXT : "+me.getMessage()); 338 } 339 } 340 341 if (cbm == null) { 342 return null; 343 } 344 345 jmsMessage.setWriteableState(false); 346 if (creatingSession.sessionAcknowledgementMode == Session.CLIENT_ACKNOWLEDGE || 347 creatingSession.getTransacted()) { 348 if (ackOrHold) { 351 MantaMessage msg = jmsMessage.makeCopy(); 352 cbm.setPayload(msg); 353 } 354 } 355 356 creatingSession.ackOrHold(cbm); 358 return jmsMessage; 359 360 361 } 363 364 365 private boolean isBreakingNoLocal(MantaBusMessage cbm, 366 MantaMessage jmsMessage) throws JMSException { 367 368 String connId = jmsMessage.getConnId(); 369 if (isNoLocal && connId != null) { 370 if (connId.equals(creatingSession.owningConnection.getClientID())) { 373 creatingSession.ackOrHold(cbm); return true; 375 } } return false; 378 } 379 380 390 public Message receiveNoWait() throws JMSException { 391 392 return receive(3000L); 393 394 } 396 414 public void setMessageListener(MessageListener listener) throws JMSException { 415 checkLegalOperation(); 416 417 if (this.messageListener != null && theDestination instanceof Queue ) 419 this.creatingSession.deregisterFromQueue(this); 420 421 this.messageListener = listener; 423 424 if (listener != null) { 425 synchronized (innerQueue) { 427 MantaBusMessage mbm; 428 synchronized(creatingSession.listenersCount) { 429 if (creatingSession.isClosed||creatingSession.isClosing) 430 return; 431 creatingSession.listenersCount.add(); 432 } 433 434 while (!innerQueue.isEmpty()) { 435 mbm = (MantaBusMessage) innerQueue.dequeue(); 436 if (mbm.getValidUntil()>SystemTime.gmtCurrentTimeMillis()) { 437 creatingSession.ackOrHold(mbm); 438 listener.onMessage(convertToJMSMessage(mbm, creatingSession)); 439 } 440 } 441 442 synchronized(creatingSession.listenersCount) { 443 creatingSession.listenersCount.remove(); 444 if (creatingSession.listenersCount.val()==0) { 445 creatingSession.listenersCount.notifyAll(); 446 } 447 } 448 449 if (theDestination instanceof Queue ) 450 creatingSession.listenToQueue(this); 451 } 452 } 453 } 454 455 459 void feedMessageListener(MantaBusMessage mbm) 460 throws JMSException { 461 462 checkLegalOperation(); 463 MantaMessage message = convertToJMSMessage(mbm, creatingSession); 464 if (messageListener != null) { 465 String connId = message.getConnId(); 466 if (isNoLocal && connId != null && this.theDestination instanceof Topic ) { 472 if (connId.equals(creatingSession.owningConnection.getClientID())) { 473 creatingSession.ackMessage(mbm); return; 475 } } 478 if (creatingSession.sessionAcknowledgementMode==Session.CLIENT_ACKNOWLEDGE|| 479 creatingSession.sessionAcknowledgementMode==Session.SESSION_TRANSACTED) { 480 MantaMessage copy = message.makeCopy(); 481 mbm.setPayload(copy); 482 } 483 484 this.creatingSession.startLocalTransactionIfNeeded(); 486 487 creatingSession.ackOrHold(mbm); 489 490 if (this.messageListener != null) { 492 try { 493 this.messageListener.onMessage(message); 494 } catch (Throwable t) { 495 log.error("Exception in message listener: " + 496 t.getMessage()); 497 } 498 } 499 else { 500 log.error("Message arrived to a consumer with no registered listener. MessageID="+message.getJMSMessageID()); 501 } 502 } 503 else 505 innerQueue.enqueue(mbm); 506 } 507 508 511 512 515 public Queue getQueue() throws JMSException { 516 checkLegalOperation(); 517 return (Queue ) getDestination(); 518 } 519 520 523 public boolean getNoLocal() throws JMSException { 524 checkLegalOperation(); 525 return isNoLocal; 526 } 527 528 531 532 public Topic getTopic() throws JMSException { 533 checkLegalOperation(); 534 return (Topic ) getDestination(); 535 } 536 537 private void checkLegalOperation() throws JMSException { 538 if (isClosed) 539 throw new IllegalStateException ("MNJMS00061 : OPERATION UNALLOWED. METHOD FAILED: checkLegalOperation(). REASON : CONSUMER IS CLOSED."); 540 541 } 542 543 550 static MantaMessage convertToJMSMessage(MantaBusMessage message, 551 MantaSession session) throws JMSException { 552 553 MantaMessage payload = null; 554 if (message != null) { 555 if (message.getHeader( 556 MantaBusMessageConsts.HEADER_NAME_PAYLOAD_TYPE).equals( 557 MantaBusMessageConsts.PAYLOAD_TYPE_JMS)) 558 payload = (MantaMessage) (message.getPayload()); 559 560 } if (payload == null) 562 return null; 563 564 MantaMessage result = null; 565 if (payload.connId != null) { 566 result = payload.makeCopy(); 568 } else { 569 result = payload; 570 } 571 572 result.creatingSession=session; 573 result.flags = result.flags & 0x0FF9FFFF; 575 if (result.getJMSType().equals(MantaMessage.BYT_M)) { 576 MantaBytesMessage r = (MantaBytesMessage)result; 577 r.reset(); 578 } 579 else if (result.getJMSType().equals(MantaMessage.STR_M)) 580 { 581 MantaStreamMessage r = (MantaStreamMessage)result; 582 r.reset(); 583 } 584 585 result.setWriteableState(true); 586 587 if (message.isRedelivered()) 589 result.flags=result.flags|MantaMessage.IS_REDELIVERED; 590 591 result.setWriteableState(false); 592 return result; 593 } 595 Destination getDestination() throws JMSException { 596 checkLegalOperation(); 597 return theDestination; 598 } 599 600 String getClientId() { 601 return clientID; 602 } 603 604 ServiceConsumer getService() { 605 return theService; 606 } 607 608 protected Destination theDestination = null; 610 611 protected String theMessageSelector = null; 613 614 protected boolean isNoLocal; 619 620 protected MantaSession creatingSession = null; 622 623 boolean isClosed; 625 626 protected String clientID; 628 629 protected ServiceConsumer theService; 631 632 private MessageListener messageListener = null; 634 635 private SynchronizedQueue innerQueue; 636 637 private Log log; 638 639 } | Popular Tags |