1 17 package org.apache.servicemix.jbi.messaging; 18 19 import java.util.Iterator ; 20 import java.util.List ; 21 import java.util.Map ; 22 23 import javax.jbi.JBIException; 24 import javax.jbi.component.Component; 25 import javax.jbi.component.ComponentLifeCycle; 26 import javax.jbi.messaging.DeliveryChannel; 27 import javax.jbi.messaging.ExchangeStatus; 28 import javax.jbi.messaging.MessageExchange; 29 import javax.jbi.messaging.MessageExchangeFactory; 30 import javax.jbi.messaging.MessagingException; 31 import javax.jbi.messaging.MessageExchange.Role; 32 import javax.jbi.servicedesc.ServiceEndpoint; 33 import javax.transaction.Transaction ; 34 import javax.transaction.TransactionManager ; 35 import javax.xml.namespace.QName ; 36 37 import org.apache.activemq.util.IdGenerator; 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.apache.servicemix.JbiConstants; 41 import org.apache.servicemix.MessageExchangeListener; 42 import org.apache.servicemix.jbi.ExchangeTimeoutException; 43 import org.apache.servicemix.jbi.container.ActivationSpec; 44 import org.apache.servicemix.jbi.container.JBIContainer; 45 import org.apache.servicemix.jbi.framework.ComponentContextImpl; 46 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl; 47 import org.apache.servicemix.jbi.util.BoundedLinkedQueue; 48 49 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 50 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 51 52 57 public class DeliveryChannelImpl implements DeliveryChannel { 58 59 private static final Log log = LogFactory.getLog(DeliveryChannelImpl.class); 60 61 private JBIContainer container; 62 private ComponentContextImpl context; 63 private ComponentMBeanImpl component; 64 private BoundedLinkedQueue queue = new BoundedLinkedQueue(); 65 private IdGenerator idGenerator = new IdGenerator(); 66 private MessageExchangeFactory inboundFactory; 67 private int intervalCount = 0; 68 private long lastSendTime = System.currentTimeMillis(); 69 private long lastReceiveTime = System.currentTimeMillis(); 70 private AtomicBoolean closed = new AtomicBoolean(false); 71 private Map waiters = new ConcurrentHashMap(); 72 73 78 private Map exchangesById = new ConcurrentHashMap(); 79 80 86 public DeliveryChannelImpl(ComponentMBeanImpl component) { 87 this.component = component; 88 this.container = component.getContainer(); 89 } 90 91 94 public int getQueueSize() { 95 return queue.size(); 96 } 97 98 101 public int getQueueCapacity() { 102 return queue.capacity(); 103 } 104 105 110 public void setQueueCapacity(int value) { 111 queue.setCapacity(value); 112 } 113 114 119 public void close() throws MessagingException { 120 if (this.closed.compareAndSet(false, true)) { 121 if (log.isDebugEnabled()) { 122 log.debug("Closing DeliveryChannel " + this); 123 } 124 List pending = queue.closeAndFlush(); 125 for (Iterator iter = pending.iterator(); iter.hasNext();) { 126 MessageExchangeImpl messageExchange = (MessageExchangeImpl) iter.next(); 127 if (messageExchange.getTransactionContext() != null && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) { 128 notifyExchange(messageExchange.getMirror(), messageExchange.getMirror(), "close"); 129 } 130 } 131 Object [] threads = waiters.keySet().toArray(); 133 for (int i = 0; i < threads.length; i++) { 134 ((Thread ) threads[i]).interrupt(); 135 } 136 ServiceEndpoint[] endpoints = container.getRegistry().getEndpointsForComponent(component.getComponentNameSpace()); 138 for (int i = 0; i < endpoints.length; i++) { 139 try { 140 component.getContext().deactivateEndpoint(endpoints[i]); 141 } catch (JBIException e) { 142 log.error("Error deactivating endpoint", e); 143 } 144 } 145 } 148 } 149 150 protected void checkNotClosed() throws MessagingException { 151 if (closed.get()) { 152 throw new MessagingException(this + " has been closed."); 153 } 154 } 155 156 162 public MessageExchangeFactory createExchangeFactory() { 163 MessageExchangeFactoryImpl result = createMessageExchangeFactory(); 164 result.setContext(context); 165 ActivationSpec activationSpec = context.getActivationSpec(); 166 if (activationSpec != null) { 167 String componentName = context.getComponentNameSpace().getName(); 168 QName serviceName = activationSpec.getDestinationService(); 170 if (serviceName != null) { 171 result.setServiceName(serviceName); 172 log.debug("default destination serviceName for " + componentName + " = " + serviceName); 173 } 174 QName interfaceName = activationSpec.getDestinationInterface(); 175 if (interfaceName != null) { 176 result.setInterfaceName(interfaceName); 177 log.debug("default destination interfaceName for " + componentName + " = " + interfaceName); 178 } 179 QName operationName = activationSpec.getDestinationOperation(); 180 if (operationName != null) { 181 result.setOperationName(operationName); 182 log.debug("default destination operationName for " + componentName + " = " + operationName); 183 } 184 String endpointName = activationSpec.getDestinationEndpoint(); 185 if (endpointName != null) { 186 boolean endpointSet = false; 187 log.debug("default destination endpointName for " + componentName + " = " + endpointName); 188 if (serviceName != null && endpointName != null) { 189 endpointName = endpointName.trim(); 190 ServiceEndpoint endpoint = container.getRegistry().getEndpoint(serviceName, endpointName); 191 if (endpoint != null) { 192 result.setEndpoint(endpoint); 193 log.info("Set default destination endpoint for " + componentName + " to " + endpoint); 194 endpointSet = true; 195 } 196 } 197 if (!endpointSet) { 198 log.warn("Could not find destination endpoint for " + componentName + " service(" + serviceName 199 + ") with endpointName " + endpointName); 200 } 201 } 202 } 203 return result; 204 } 205 206 212 public MessageExchangeFactory createExchangeFactory(QName interfaceName) { 213 MessageExchangeFactoryImpl result = createMessageExchangeFactory(); 214 result.setInterfaceName(interfaceName); 215 return result; 216 } 217 218 224 public MessageExchangeFactory createExchangeFactoryForService(QName serviceName) { 225 MessageExchangeFactoryImpl result = createMessageExchangeFactory(); 226 result.setServiceName(serviceName); 227 return result; 228 } 229 230 236 public MessageExchangeFactory createExchangeFactory(ServiceEndpoint endpoint) { 237 MessageExchangeFactoryImpl result = createMessageExchangeFactory(); 238 result.setEndpoint(endpoint); 239 return result; 240 } 241 242 protected MessageExchangeFactoryImpl createMessageExchangeFactory() { 243 MessageExchangeFactoryImpl messageExchangeFactory = new MessageExchangeFactoryImpl(idGenerator, closed); 244 messageExchangeFactory.setContext(context); 245 return messageExchangeFactory; 246 } 247 248 252 public MessageExchange accept() throws MessagingException { 253 return accept(Long.MAX_VALUE); 254 } 255 256 263 public MessageExchange accept(long timeoutMS) throws MessagingException { 264 try { 265 checkNotClosed(); 266 MessageExchangeImpl me = (MessageExchangeImpl) queue.poll(timeoutMS); 267 if (me != null) { 268 if (me.getPacket().isAborted()) { 271 if (log.isDebugEnabled()) { 272 log.debug("Aborted " + me.getExchangeId() + " in " + this); 273 } 274 me = null; 275 } else { 276 if (log.isDebugEnabled()) { 277 log.debug("Accepting " + me.getExchangeId() + " in " + this); 278 } 279 if (me.getTxLock() != null && me.getStatus() != ExchangeStatus.ACTIVE) { 282 notifyExchange(me.getMirror(), me.getTxLock(), "acceptFinishedExchangeWithTxLock"); 283 me.handleAccept(); 284 if (log.isTraceEnabled()) { 285 log.trace("Accepted: " + me); 286 } 287 } 288 else if (me.isTransacted() && me.getStatus() != ExchangeStatus.ACTIVE) { 290 me.handleAccept(); 292 if (log.isTraceEnabled()) { 293 log.trace("Accepted: " + me); 294 } 295 } 296 else { 297 resumeTx(me); 298 me.handleAccept(); 299 if (log.isTraceEnabled()) { 300 log.trace("Accepted: " + me); 301 } 302 } 303 } 304 } 305 return me; 306 } 307 catch (InterruptedException e) { 308 throw new MessagingException("accept failed", e); 309 } 310 } 311 312 protected void autoSetPersistent(MessageExchangeImpl me) { 313 Boolean persistent = me.getPersistent(); 314 if (persistent == null) { 315 if (context.getActivationSpec().getPersistent() != null) { 316 persistent = context.getActivationSpec().getPersistent(); 317 } else { 318 persistent = Boolean.valueOf(context.getContainer().isPersistent()); 319 } 320 me.setPersistent(persistent); 321 } 322 } 323 324 protected void throttle() { 325 if (component.isExchangeThrottling()) { 326 if (component.getThrottlingInterval() > intervalCount) { 327 intervalCount = 0; 328 try { 329 Thread.sleep(component.getThrottlingTimeout()); 330 } 331 catch (InterruptedException e) { 332 log.warn("throttling failed", e); 333 } 334 } 335 intervalCount++; 336 } 337 } 338 339 protected void doSend(MessageExchangeImpl me, boolean sync) throws MessagingException { 340 MessageExchangeImpl mirror = me.getMirror(); 341 boolean finished = me.getStatus() != ExchangeStatus.ACTIVE; 342 try { 343 if (log.isTraceEnabled()) { 344 log.trace("Sent: " + me); 345 } 346 if (me.getPacket().isAborted()) { 348 throw new ExchangeTimeoutException(me); 349 } 350 autoEnlistInTx(me); 352 autoSetPersistent(me); 354 throttle(); 356 incrementOutboundStats(); 358 if (me.getRole() == Role.CONSUMER) { 360 me.setSourceId(component.getComponentNameSpace()); 361 } 362 container.callListeners(me); 364 me.handleSend(sync); 365 mirror.setTxState(MessageExchangeImpl.TX_STATE_NONE); 366 if (finished && 369 me.getTxLock() == null && 370 me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED && 371 me.isPushDelivery() == false && 372 me.getRole() == Role.CONSUMER) { 373 me.setTransactionContext(null); 374 } 375 container.sendExchange(mirror); 376 } catch (MessagingException e) { 377 if (log.isDebugEnabled()) { 378 log.debug("Exception processing: " + me.getExchangeId() + " in " + this); 379 } 380 throw e; 381 } finally { 382 if (me.getTxLock() != null) { 384 if (mirror.getTxState() == MessageExchangeImpl.TX_STATE_ENLISTED) { 385 suspendTx(mirror); 386 } 387 synchronized (me.getTxLock()) { 388 notifyExchange(me, me.getTxLock(), "doSendWithTxLock"); 389 } 390 } 391 } 392 } 393 394 400 public void send(MessageExchange messageExchange) throws MessagingException { 401 checkNotClosed(); 403 if (log.isDebugEnabled()) { 405 log.debug("Send " + messageExchange.getExchangeId() + " in " + this); 406 } 407 messageExchange.setProperty(JbiConstants.SEND_SYNC, null); 409 MessageExchangeImpl me = (MessageExchangeImpl) messageExchange; 411 doSend(me, false); 412 } 413 414 421 public boolean sendSync(MessageExchange messageExchange) throws MessagingException { 422 return sendSync(messageExchange, 0); 423 } 424 425 433 public boolean sendSync(MessageExchange messageExchange, long timeout) throws MessagingException { 434 checkNotClosed(); 436 if (log.isDebugEnabled()) { 438 log.debug("SendSync " + messageExchange.getExchangeId() + " in " + this); 439 } 440 boolean result = false; 441 messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE); 443 MessageExchangeImpl me = (MessageExchangeImpl) messageExchange; 445 String exchangeKey = getKeyForExchange(me); 446 try { 447 exchangesById.put(exchangeKey, me); 448 synchronized (me) { 450 doSend(me, true); 451 if (me.getSyncState() != MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) { 452 waitForExchange(me, me, timeout, "sendSync"); 453 } else { 454 if (log.isDebugEnabled()) { 455 log.debug("Exchange " + messageExchange.getExchangeId() + " has already been answered (no need to wait)"); 456 } 457 } 458 } 459 if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) { 460 me.handleAccept(); 461 resumeTx(me); 467 result = true; 469 } else { 470 if (log.isDebugEnabled()) { 472 log.debug("Exchange " + messageExchange.getExchangeId() + " has been aborted"); 473 } 474 me.getPacket().setAborted(true); 475 result = false; 476 } 477 } catch (InterruptedException e) { 478 throw new MessagingException(e); 479 } catch (RuntimeException e) { 480 e.printStackTrace(); 481 throw e; 482 } finally { 483 exchangesById.remove(exchangeKey); 484 } 485 return result; 486 } 487 488 491 public JBIContainer getContainer() { 492 return container; 493 } 494 495 498 public void setContainer(JBIContainer container) { 499 this.container = container; 500 } 501 502 505 public ComponentMBeanImpl getComponent() { 506 return component; 507 } 508 509 514 public ComponentContextImpl getContext() { 515 return context; 516 } 517 518 523 public void setContext(ComponentContextImpl context) { 524 this.context = context; 525 } 526 527 protected void incrementInboundStats() { 528 MessagingStats messagingStats = component.getMessagingStats(); 529 long currentTime = System.currentTimeMillis(); 530 if (container.isNotifyStatistics()) { 531 long oldCount = messagingStats.getInboundExchanges().getCount(); 532 messagingStats.getInboundExchanges().increment(); 533 component.firePropertyChanged( 534 "inboundExchangeCount", 535 new Long (oldCount), 536 new Long (messagingStats.getInboundExchanges().getCount())); 537 double oldRate = messagingStats.getInboundExchangeRate().getAverageTime(); 538 messagingStats.getInboundExchangeRate().addTime(currentTime - lastReceiveTime); 539 component.firePropertyChanged("inboundExchangeRate", 540 new Double (oldRate), 541 new Double (messagingStats.getInboundExchangeRate().getAverageTime())); 542 } else { 543 messagingStats.getInboundExchanges().increment(); 544 messagingStats.getInboundExchangeRate().addTime(currentTime - lastReceiveTime); 545 } 546 lastReceiveTime = currentTime; 547 } 548 549 protected void incrementOutboundStats() { 550 MessagingStats messagingStats = component.getMessagingStats(); 551 long currentTime = System.currentTimeMillis(); 552 if (container.isNotifyStatistics()) { 553 long oldCount = messagingStats.getOutboundExchanges().getCount(); 554 messagingStats.getOutboundExchanges().increment(); 555 component.firePropertyChanged( 556 "outboundExchangeCount", 557 new Long (oldCount), 558 new Long (messagingStats.getOutboundExchanges().getCount())); 559 double oldRate = messagingStats.getOutboundExchangeRate().getAverageTime(); 560 messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime); 561 component.firePropertyChanged("outboundExchangeRate", 562 new Double (oldRate), 563 new Double (messagingStats.getOutboundExchangeRate().getAverageTime())); 564 } else { 565 messagingStats.getOutboundExchanges().increment(); 566 messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime); 567 } 568 lastSendTime = currentTime; 569 } 570 571 577 public void processInBound(MessageExchangeImpl me) throws MessagingException { 578 if (log.isTraceEnabled()) { 579 log.trace("Processing inbound exchange: " + me); 580 } 581 checkNotClosed(); 583 incrementInboundStats(); 585 586 MessageExchangeImpl original = (MessageExchangeImpl) exchangesById.get(getKeyForExchange(me)); 588 if (original != null && me != original) { 589 original.copyFrom(me); 590 me = original; 591 } 592 if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) { 595 { 599 suspendTx(original); 602 me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED); 603 notifyExchange(original, original, "processInboundSynchronousExchange"); 604 } 605 return; 606 } 607 608 MessageExchangeListener listener = getExchangeListener(); 612 if (listener != null) { 613 me.handleAccept(); 614 if (log.isTraceEnabled()) { 615 log.trace("Received: " + me); 616 } 617 me.setPushDeliver(true); 620 ClassLoader old = Thread.currentThread().getContextClassLoader(); 622 try { 623 Thread.currentThread().setContextClassLoader(component.getComponent().getClass().getClassLoader()); 624 listener.onMessageExchange(me); 625 } finally { 626 Thread.currentThread().setContextClassLoader(old); 627 } 628 return; 630 } 631 632 634 if (me.isTransacted() && me.getStatus() == ExchangeStatus.ACTIVE) { 638 if (me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED) { 641 try { 642 suspendTx(me); 643 queue.put(me); 644 } catch (InterruptedException e) { 645 log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e); 646 me.getPacket().setAborted(true); 647 } 648 } 649 else { 654 Object lock = new Object (); 655 synchronized (lock) { 656 try { 657 me.setTxLock(lock); 658 suspendTx(me); 659 queue.put(me); 660 waitForExchange(me, lock, 0, "processInboundTransactionalExchange"); 661 } catch (InterruptedException e) { 662 log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e); 663 me.getPacket().setAborted(true); 664 } finally { 665 me.setTxLock(null); 666 resumeTx(me); 667 } 668 } 669 } 670 } 671 else { 675 try { 676 queue.put(me); 677 } catch (InterruptedException e) { 678 log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e); 679 me.getPacket().setAborted(true); 680 } 681 } 682 } 683 684 protected MessageExchangeListener getExchangeListener() { 685 Component component = this.component.getComponent(); 686 if (component instanceof MessageExchangeListener) { 687 return (MessageExchangeListener) component; 688 } 689 ComponentLifeCycle lifecycle = this.component.getLifeCycle(); 690 if (lifecycle instanceof MessageExchangeListener) { 691 return (MessageExchangeListener) lifecycle; 692 } 693 return null; 694 } 695 696 702 protected void waitForExchange(MessageExchangeImpl me, Object lock, long timeout, String from) throws InterruptedException { 703 if (log.isDebugEnabled()) { 705 log.debug("Waiting for exchange " + me.getExchangeId() + " (" + Integer.toHexString(me.hashCode()) + ") to be answered in " + this + " from " + from); 706 } 707 Thread th = Thread.currentThread(); 708 try { 709 waiters.put(th, Boolean.TRUE); 710 lock.wait(timeout); 711 } finally { 712 waiters.remove(th); 713 } 714 if (log.isDebugEnabled()) { 715 log.debug("Notified: " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from); 716 } 717 } 718 719 protected void notifyExchange(MessageExchangeImpl me, Object lock, String from) { 720 if (log.isDebugEnabled()) { 721 log.debug("Notifying exchange " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from); 722 } 723 synchronized (lock) { 724 lock.notify(); 725 } 726 } 727 728 733 public MessageExchangeFactory getInboundFactory() { 734 if (inboundFactory == null) { 735 inboundFactory = createExchangeFactory(); 736 } 737 return inboundFactory; 738 } 739 740 protected void suspendTx(MessageExchangeImpl me) { 741 try { 742 Transaction oldTx = me.getTransactionContext(); 743 if (oldTx != null) { 744 TransactionManager tm = (TransactionManager ) container.getTransactionManager(); 745 if (tm != null) { 746 if (log.isDebugEnabled()) { 747 log.debug("Suspending transaction for " + me.getExchangeId() + " in " + this); 748 } 749 Transaction tx = tm.suspend(); 750 if (tx != oldTx) { 751 throw new IllegalStateException ("the transaction context set in the messageExchange is not bound to the current thread"); 752 } 753 } 754 } 755 } catch (Exception e) { 756 log.info("Exchange " + me.getExchangeId() + " aborted due to transaction exception", e); 757 me.getPacket().setAborted(true); 758 } 759 } 760 761 protected void resumeTx(MessageExchangeImpl me) throws MessagingException { 762 try { 763 Transaction oldTx = me.getTransactionContext(); 764 if (oldTx != null) { 765 TransactionManager tm = (TransactionManager ) container.getTransactionManager(); 766 if (tm != null) { 767 if (log.isDebugEnabled()) { 768 log.debug("Resuming transaction for " + me.getExchangeId() + " in " + this); 769 } 770 tm.resume(oldTx); 771 } 772 } 773 } catch (Exception e) { 774 throw new MessagingException(e); 775 } 776 } 777 778 785 protected void autoEnlistInTx(MessageExchangeImpl me) throws MessagingException { 786 try { 787 if (container.isAutoEnlistInTransaction()) { 788 TransactionManager tm = (TransactionManager ) container.getTransactionManager(); 789 if (tm != null) { 790 Transaction tx = tm.getTransaction(); 791 if (tx != null) { 792 Object oldTx = me.getTransactionContext(); 793 if (oldTx == null) { 794 me.setTransactionContext(tx); 795 } else if (oldTx != tx) { 796 throw new IllegalStateException ( 797 "the transaction context set in the messageExchange is not bound to the current thread"); 798 } 799 } 800 } 801 } 802 } catch (Exception e) { 803 throw new MessagingException(e); 804 } 805 } 806 807 810 public String toString() { 811 return "DeliveryChannel{" + component.getName() + "}"; 812 } 813 814 private String getKeyForExchange(MessageExchangeImpl me) { 815 return (me.getRole() == Role.CONSUMER ? "consumer:" : "provider:") + me.getExchangeId(); 816 } 817 } 818 | Popular Tags |