KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > messaging > DeliveryChannelImpl


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.messaging;
18
19 import java.util.Iterator JavaDoc;
20 import java.util.List JavaDoc;
21 import java.util.Map JavaDoc;
22
23 import javax.jbi.JBIException;
24 import javax.jbi.component.Component;
25 import javax.jbi.component.ComponentLifeCycle;
26 import javax.jbi.messaging.DeliveryChannel;
27 import javax.jbi.messaging.ExchangeStatus;
28 import javax.jbi.messaging.MessageExchange;
29 import javax.jbi.messaging.MessageExchangeFactory;
30 import javax.jbi.messaging.MessagingException;
31 import javax.jbi.messaging.MessageExchange.Role;
32 import javax.jbi.servicedesc.ServiceEndpoint;
33 import javax.transaction.Transaction JavaDoc;
34 import javax.transaction.TransactionManager JavaDoc;
35 import javax.xml.namespace.QName JavaDoc;
36
37 import org.apache.activemq.util.IdGenerator;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.servicemix.JbiConstants;
41 import org.apache.servicemix.MessageExchangeListener;
42 import org.apache.servicemix.jbi.ExchangeTimeoutException;
43 import org.apache.servicemix.jbi.container.ActivationSpec;
44 import org.apache.servicemix.jbi.container.JBIContainer;
45 import org.apache.servicemix.jbi.framework.ComponentContextImpl;
46 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
47 import org.apache.servicemix.jbi.util.BoundedLinkedQueue;
48
49 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
50 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
51
52 /**
53  * DeliveryChannel implementation
54  *
55  * @version $Revision: 442201 $
56  */

57 public class DeliveryChannelImpl implements DeliveryChannel {
58
59     private static final Log log = LogFactory.getLog(DeliveryChannelImpl.class);
60
61     private JBIContainer container;
62     private ComponentContextImpl context;
63     private ComponentMBeanImpl component;
64     private BoundedLinkedQueue queue = new BoundedLinkedQueue();
65     private IdGenerator idGenerator = new IdGenerator();
66     private MessageExchangeFactory inboundFactory;
67     private int intervalCount = 0;
68     private long lastSendTime = System.currentTimeMillis();
69     private long lastReceiveTime = System.currentTimeMillis();
70     private AtomicBoolean closed = new AtomicBoolean(false);
71     private Map JavaDoc waiters = new ConcurrentHashMap();
72     
73     /**
74      * When using clustering and sendSync, the exchange received will not be the same
75      * as the one sent (because it has been serialized/deserialized.
76      * We thus need to keep the original exchange in a map and override its state.
77      */

78     private Map JavaDoc exchangesById = new ConcurrentHashMap();
79
80     /**
81      * Constructor
82      *
83      * @param container
84      * @param componentName
85      */

86     public DeliveryChannelImpl(ComponentMBeanImpl component) {
87         this.component = component;
88         this.container = component.getContainer();
89     }
90
91     /**
92      * @return size of the inbound Queue
93      */

94     public int getQueueSize() {
95         return queue.size();
96     }
97
98     /**
99      * @return the capacity of the inbound queue
100      */

101     public int getQueueCapacity() {
102         return queue.capacity();
103     }
104
105     /**
106      * Set the inbound queue capacity
107      *
108      * @param value
109      */

110     public void setQueueCapacity(int value) {
111         queue.setCapacity(value);
112     }
113
114     /**
115      * close the delivery channel
116      *
117      * @throws MessagingException
118      */

119     public void close() throws MessagingException {
120         if (this.closed.compareAndSet(false, true)) {
121             if (log.isDebugEnabled()) {
122                 log.debug("Closing DeliveryChannel " + this);
123             }
124             List JavaDoc pending = queue.closeAndFlush();
125             for (Iterator JavaDoc iter = pending.iterator(); iter.hasNext();) {
126                 MessageExchangeImpl messageExchange = (MessageExchangeImpl) iter.next();
127                 if (messageExchange.getTransactionContext() != null && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
128                     notifyExchange(messageExchange.getMirror(), messageExchange.getMirror(), "close");
129                 }
130             }
131             // Interrupt all blocked thread
132
Object JavaDoc[] threads = waiters.keySet().toArray();
133             for (int i = 0; i < threads.length; i++) {
134                 ((Thread JavaDoc) threads[i]).interrupt();
135             }
136             // deactivate all endpoints from this component
137
ServiceEndpoint[] endpoints = container.getRegistry().getEndpointsForComponent(component.getComponentNameSpace());
138             for (int i = 0; i < endpoints.length; i++) {
139                 try {
140                     component.getContext().deactivateEndpoint(endpoints[i]);
141                 } catch (JBIException e) {
142                     log.error("Error deactivating endpoint", e);
143                 }
144             }
145             // TODO: Cause all accepts to return null
146
// TODO: Abort all pending exchanges
147
}
148     }
149
150     protected void checkNotClosed() throws MessagingException {
151         if (closed.get()) {
152             throw new MessagingException(this + " has been closed.");
153         }
154     }
155
156     /**
157      * Create a message exchange factory. This factory will create exchange instances with all appropriate properties
158      * set to null.
159      *
160      * @return a message exchange factory
161      */

162     public MessageExchangeFactory createExchangeFactory() {
163         MessageExchangeFactoryImpl result = createMessageExchangeFactory();
164         result.setContext(context);
165         ActivationSpec activationSpec = context.getActivationSpec();
166         if (activationSpec != null) {
167             String JavaDoc componentName = context.getComponentNameSpace().getName();
168             // lets auto-default the container-routing information
169
QName JavaDoc serviceName = activationSpec.getDestinationService();
170             if (serviceName != null) {
171                 result.setServiceName(serviceName);
172                 log.debug("default destination serviceName for " + componentName + " = " + serviceName);
173             }
174             QName JavaDoc interfaceName = activationSpec.getDestinationInterface();
175             if (interfaceName != null) {
176                 result.setInterfaceName(interfaceName);
177                 log.debug("default destination interfaceName for " + componentName + " = " + interfaceName);
178             }
179             QName JavaDoc operationName = activationSpec.getDestinationOperation();
180             if (operationName != null) {
181                 result.setOperationName(operationName);
182                 log.debug("default destination operationName for " + componentName + " = " + operationName);
183             }
184             String JavaDoc endpointName = activationSpec.getDestinationEndpoint();
185             if (endpointName != null) {
186                 boolean endpointSet = false;
187                 log.debug("default destination endpointName for " + componentName + " = " + endpointName);
188                 if (serviceName != null && endpointName != null) {
189                     endpointName = endpointName.trim();
190                     ServiceEndpoint endpoint = container.getRegistry().getEndpoint(serviceName, endpointName);
191                     if (endpoint != null) {
192                         result.setEndpoint(endpoint);
193                         log.info("Set default destination endpoint for " + componentName + " to " + endpoint);
194                         endpointSet = true;
195                     }
196                 }
197                 if (!endpointSet) {
198                     log.warn("Could not find destination endpoint for " + componentName + " service(" + serviceName
199                             + ") with endpointName " + endpointName);
200                 }
201             }
202         }
203         return result;
204     }
205
206     /**
207      * Create a message exchange factory for the given interface name.
208      *
209      * @param interfaceName name of the interface for which all exchanges created by the returned factory will be set
210      * @return an exchange factory that will create exchanges for the given interface; must be non-null
211      */

212     public MessageExchangeFactory createExchangeFactory(QName JavaDoc interfaceName) {
213         MessageExchangeFactoryImpl result = createMessageExchangeFactory();
214         result.setInterfaceName(interfaceName);
215         return result;
216     }
217
218     /**
219      * Create a message exchange factory for the given service name.
220      *
221      * @param serviceName name of the service for which all exchanges created by the returned factory will be set
222      * @return an exchange factory that will create exchanges for the given service; must be non-null
223      */

224     public MessageExchangeFactory createExchangeFactoryForService(QName JavaDoc serviceName) {
225         MessageExchangeFactoryImpl result = createMessageExchangeFactory();
226         result.setServiceName(serviceName);
227         return result;
228     }
229
230     /**
231      * Create a message exchange factory for the given endpoint.
232      *
233      * @param endpoint endpoint for which all exchanges created by the returned factory will be set for
234      * @return an exchange factory that will create exchanges for the given endpoint
235      */

236     public MessageExchangeFactory createExchangeFactory(ServiceEndpoint endpoint) {
237         MessageExchangeFactoryImpl result = createMessageExchangeFactory();
238         result.setEndpoint(endpoint);
239         return result;
240     }
241
242     protected MessageExchangeFactoryImpl createMessageExchangeFactory() {
243         MessageExchangeFactoryImpl messageExchangeFactory = new MessageExchangeFactoryImpl(idGenerator, closed);
244         messageExchangeFactory.setContext(context);
245         return messageExchangeFactory;
246     }
247
248     /**
249      * @return a MessageExchange - blocking call
250      * @throws MessagingException
251      */

252     public MessageExchange accept() throws MessagingException {
253         return accept(Long.MAX_VALUE);
254     }
255
256     /**
257      * return a MessageExchange
258      *
259      * @param timeoutMS
260      * @return Message Exchange
261      * @throws MessagingException
262      */

263     public MessageExchange accept(long timeoutMS) throws MessagingException {
264         try {
265             checkNotClosed();
266             MessageExchangeImpl me = (MessageExchangeImpl) queue.poll(timeoutMS);
267             if (me != null) {
268                 // If the exchange has already timed out,
269
// do not give it to the component
270
if (me.getPacket().isAborted()) {
271                     if (log.isDebugEnabled()) {
272                         log.debug("Aborted " + me.getExchangeId() + " in " + this);
273                     }
274                     me = null;
275                 } else {
276                     if (log.isDebugEnabled()) {
277                         log.debug("Accepting " + me.getExchangeId() + " in " + this);
278                     }
279                     // If we have a tx lock and the exchange is not active, we need
280
// to notify here without resuming transaction
281
if (me.getTxLock() != null && me.getStatus() != ExchangeStatus.ACTIVE) {
282                         notifyExchange(me.getMirror(), me.getTxLock(), "acceptFinishedExchangeWithTxLock");
283                         me.handleAccept();
284                         if (log.isTraceEnabled()) {
285                             log.trace("Accepted: " + me);
286                         }
287                     }
288                     // We transactionnaly deliver a finished exchange
289
else if (me.isTransacted() && me.getStatus() != ExchangeStatus.ACTIVE) {
290                         // Do not resume transaction
291
me.handleAccept();
292                         if (log.isTraceEnabled()) {
293                             log.trace("Accepted: " + me);
294                         }
295                     }
296                     else {
297                         resumeTx(me);
298                         me.handleAccept();
299                         if (log.isTraceEnabled()) {
300                             log.trace("Accepted: " + me);
301                         }
302                     }
303                 }
304             }
305             return me;
306         }
307         catch (InterruptedException JavaDoc e) {
308             throw new MessagingException("accept failed", e);
309         }
310     }
311     
312     protected void autoSetPersistent(MessageExchangeImpl me) {
313         Boolean JavaDoc persistent = me.getPersistent();
314         if (persistent == null) {
315             if (context.getActivationSpec().getPersistent() != null) {
316                 persistent = context.getActivationSpec().getPersistent();
317             } else {
318                 persistent = Boolean.valueOf(context.getContainer().isPersistent());
319             }
320             me.setPersistent(persistent);
321         }
322     }
323     
324     protected void throttle() {
325         if (component.isExchangeThrottling()) {
326             if (component.getThrottlingInterval() > intervalCount) {
327                 intervalCount = 0;
328                 try {
329                     Thread.sleep(component.getThrottlingTimeout());
330                 }
331                 catch (InterruptedException JavaDoc e) {
332                     log.warn("throttling failed", e);
333                 }
334             }
335             intervalCount++;
336         }
337     }
338
339     protected void doSend(MessageExchangeImpl me, boolean sync) throws MessagingException {
340         MessageExchangeImpl mirror = me.getMirror();
341         boolean finished = me.getStatus() != ExchangeStatus.ACTIVE;
342         try {
343             if (log.isTraceEnabled()) {
344                 log.trace("Sent: " + me);
345             }
346             // If the message has timed out
347
if (me.getPacket().isAborted()) {
348                 throw new ExchangeTimeoutException(me);
349             }
350             // Auto enlist exchange in transaction
351
autoEnlistInTx(me);
352             // Update persistence info
353
autoSetPersistent(me);
354             // Throttle if needed
355
throttle();
356             // Update stats
357
incrementOutboundStats();
358             // Store the consumer component
359
if (me.getRole() == Role.CONSUMER) {
360                 me.setSourceId(component.getComponentNameSpace());
361             }
362             // Call the listeners before the ownership changes
363
container.callListeners(me);
364             me.handleSend(sync);
365             mirror.setTxState(MessageExchangeImpl.TX_STATE_NONE);
366             // If this is the DONE or ERROR status from a synchronous transactional exchange,
367
// it should not be part of the transaction, so remove the tx context
368
if (finished &&
369                 me.getTxLock() == null &&
370                 me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED &&
371                 me.isPushDelivery() == false &&
372                 me.getRole() == Role.CONSUMER) {
373                 me.setTransactionContext(null);
374             }
375             container.sendExchange(mirror);
376         } catch (MessagingException e) {
377             if (log.isDebugEnabled()) {
378                 log.debug("Exception processing: " + me.getExchangeId() + " in " + this);
379             }
380             throw e;
381         } finally {
382             // If there is a tx lock, we need to suspend and notify
383
if (me.getTxLock() != null) {
384                 if (mirror.getTxState() == MessageExchangeImpl.TX_STATE_ENLISTED) {
385                     suspendTx(mirror);
386                 }
387                 synchronized (me.getTxLock()) {
388                     notifyExchange(me, me.getTxLock(), "doSendWithTxLock");
389                 }
390             }
391         }
392     }
393
394     /**
395      * routes a MessageExchange
396      *
397      * @param messageExchange
398      * @throws MessagingException
399      */

400     public void send(MessageExchange messageExchange) throws MessagingException {
401         // If the delivery channel has been closed
402
checkNotClosed();
403         // Log call
404
if (log.isDebugEnabled()) {
405             log.debug("Send " + messageExchange.getExchangeId() + " in " + this);
406         }
407         // // JBI 5.5.2.1.3: remove sync property
408
messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
409         // Call doSend
410
MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
411         doSend(me, false);
412     }
413
414     /**
415      * routes a MessageExchange
416      *
417      * @param messageExchange
418      * @return true if processed
419      * @throws MessagingException
420      */

421     public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
422         return sendSync(messageExchange, 0);
423     }
424
425     /**
426      * routes a MessageExchange
427      *
428      * @param messageExchange
429      * @param timeoutMS
430      * @return true if processed
431      * @throws MessagingException
432      */

433     public boolean sendSync(MessageExchange messageExchange, long timeout) throws MessagingException {
434         // If the delivery channel has been closed
435
checkNotClosed();
436         // Log call
437
if (log.isDebugEnabled()) {
438             log.debug("SendSync " + messageExchange.getExchangeId() + " in " + this);
439         }
440         boolean result = false;
441         // JBI 5.5.2.1.3: set the sendSync property
442
messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
443         // Call doSend
444
MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
445         String JavaDoc exchangeKey = getKeyForExchange(me);
446         try {
447             exchangesById.put(exchangeKey, me);
448             // Synchronously send a message and wait for the response
449
synchronized (me) {
450                 doSend(me, true);
451                 if (me.getSyncState() != MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
452                     waitForExchange(me, me, timeout, "sendSync");
453                 } else {
454                     if (log.isDebugEnabled()) {
455                         log.debug("Exchange " + messageExchange.getExchangeId() + " has already been answered (no need to wait)");
456                     }
457                 }
458             }
459             if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
460                 me.handleAccept();
461                 // If the sender flag has been removed, it means
462
// the message has been delivered in the same thread
463
// so there is no need to resume the transaction
464
// See processInBound
465
//if (messageExchangeImpl.getSyncSenderThread() != null) {
466
resumeTx(me);
467                 //}
468
result = true;
469             } else {
470                 // JBI 5.5.2.1.3: the exchange should be set to ERROR status
471
if (log.isDebugEnabled()) {
472                     log.debug("Exchange " + messageExchange.getExchangeId() + " has been aborted");
473                 }
474                 me.getPacket().setAborted(true);
475                 result = false;
476             }
477         } catch (InterruptedException JavaDoc e) {
478             throw new MessagingException(e);
479         } catch (RuntimeException JavaDoc e) {
480             e.printStackTrace();
481             throw e;
482         } finally {
483             exchangesById.remove(exchangeKey);
484         }
485         return result;
486     }
487
488     /**
489      * @return Returns the container.
490      */

491     public JBIContainer getContainer() {
492         return container;
493     }
494
495     /**
496      * @param container The container to set.
497      */

498     public void setContainer(JBIContainer container) {
499         this.container = container;
500     }
501
502     /**
503      * @return Returns the componentConnector.
504      */

505     public ComponentMBeanImpl getComponent() {
506         return component;
507     }
508
509     /**
510      * Get the context
511      *
512      * @return the context
513      */

514     public ComponentContextImpl getContext() {
515         return context;
516     }
517
518     /**
519      * set the context
520      *
521      * @param context
522      */

523     public void setContext(ComponentContextImpl context) {
524         this.context = context;
525     }
526
527     protected void incrementInboundStats() {
528         MessagingStats messagingStats = component.getMessagingStats();
529         long currentTime = System.currentTimeMillis();
530         if (container.isNotifyStatistics()) {
531             long oldCount = messagingStats.getInboundExchanges().getCount();
532             messagingStats.getInboundExchanges().increment();
533             component.firePropertyChanged(
534                     "inboundExchangeCount",
535                     new Long JavaDoc(oldCount),
536                     new Long JavaDoc(messagingStats.getInboundExchanges().getCount()));
537             double oldRate = messagingStats.getInboundExchangeRate().getAverageTime();
538             messagingStats.getInboundExchangeRate().addTime(currentTime - lastReceiveTime);
539             component.firePropertyChanged("inboundExchangeRate",
540                     new Double JavaDoc(oldRate),
541                     new Double JavaDoc(messagingStats.getInboundExchangeRate().getAverageTime()));
542         } else {
543             messagingStats.getInboundExchanges().increment();
544             messagingStats.getInboundExchangeRate().addTime(currentTime - lastReceiveTime);
545         }
546         lastReceiveTime = currentTime;
547     }
548     
549     protected void incrementOutboundStats() {
550         MessagingStats messagingStats = component.getMessagingStats();
551         long currentTime = System.currentTimeMillis();
552         if (container.isNotifyStatistics()) {
553             long oldCount = messagingStats.getOutboundExchanges().getCount();
554             messagingStats.getOutboundExchanges().increment();
555             component.firePropertyChanged(
556                     "outboundExchangeCount",
557                     new Long JavaDoc(oldCount),
558                     new Long JavaDoc(messagingStats.getOutboundExchanges().getCount()));
559             double oldRate = messagingStats.getOutboundExchangeRate().getAverageTime();
560             messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
561             component.firePropertyChanged("outboundExchangeRate",
562                     new Double JavaDoc(oldRate),
563                     new Double JavaDoc(messagingStats.getOutboundExchangeRate().getAverageTime()));
564         } else {
565             messagingStats.getOutboundExchanges().increment();
566             messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
567         }
568         lastSendTime = currentTime;
569     }
570     
571     /**
572      * Used internally for passing in a MessageExchange
573      *
574      * @param me
575      * @throws MessagingException
576      */

577     public void processInBound(MessageExchangeImpl me) throws MessagingException {
578         if (log.isTraceEnabled()) {
579             log.trace("Processing inbound exchange: " + me);
580         }
581         // Check if the delivery channel has been closed
582
checkNotClosed();
583         // Update stats
584
incrementInboundStats();
585
586         // Retrieve the original exchange sent
587
MessageExchangeImpl original = (MessageExchangeImpl) exchangesById.get(getKeyForExchange(me));
588         if (original != null && me != original) {
589             original.copyFrom(me);
590             me = original;
591         }
592         // Check if the incoming exchange is a response to a synchronous exchange previously sent
593
// In this case, we do not have to queue it, but rather notify the waiting thread.
594
if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
595             // If the mirror has been delivered using push, better wait until
596
// the push call return. This can only work if not using clustered flows,
597
// but the flag is transient so we do not care.
598
/*if (!me.getMirror().isPushDelivery())*/ {
599                 // Ensure that data is uptodate with the incoming exchange (in case the exchange has
600
// been serialized / deserialized by a clustered flow)
601
suspendTx(original);
602                 me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
603                 notifyExchange(original, original, "processInboundSynchronousExchange");
604             }
605             return;
606         }
607
608         // If the component implements the MessageExchangeListener,
609
// the delivery can be made synchronously, so we don't need
610
// to bother with transactions
611
MessageExchangeListener listener = getExchangeListener();
612         if (listener != null) {
613             me.handleAccept();
614             if (log.isTraceEnabled()) {
615                 log.trace("Received: " + me);
616             }
617             // Set the flag the the exchange was delivered using push mode
618
// This is important for transaction boundaries
619
me.setPushDeliver(true);
620             // Deliver the exchange
621
ClassLoader JavaDoc old = Thread.currentThread().getContextClassLoader();
622             try {
623                 Thread.currentThread().setContextClassLoader(component.getComponent().getClass().getClassLoader());
624                 listener.onMessageExchange(me);
625             } finally {
626                 Thread.currentThread().setContextClassLoader(old);
627             }
628             // TODO: handle delayed exchange notifications
629
return;
630         }
631         
632         // Component uses pull delivery.
633

634         // If the exchange is transacted, special care should be taken.
635
// But if the exchange is no more ACTIVE, just queue it, as
636
// we will never have an answer back.
637
if (me.isTransacted() && me.getStatus() == ExchangeStatus.ACTIVE) {
638             // If the transaction is conveyed by the exchange
639
// We do not need to resume the transaction in this thread
640
if (me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED) {
641                 try {
642                     suspendTx(me);
643                     queue.put(me);
644                 } catch (InterruptedException JavaDoc e) {
645                     log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
646                     me.getPacket().setAborted(true);
647                 }
648             }
649             // Else the delivery / send are enlisted in the current tx.
650
// We must suspend the transaction, queue it, and wait for the answer
651
// to be sent, at which time the tx should be suspended and resumed in
652
// this thread.
653
else {
654                 Object JavaDoc lock = new Object JavaDoc();
655                 synchronized (lock) {
656                     try {
657                         me.setTxLock(lock);
658                         suspendTx(me);
659                         queue.put(me);
660                         waitForExchange(me, lock, 0, "processInboundTransactionalExchange");
661                     } catch (InterruptedException JavaDoc e) {
662                         log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
663                         me.getPacket().setAborted(true);
664                     } finally {
665                         me.setTxLock(null);
666                         resumeTx(me);
667                     }
668                 }
669             }
670         }
671         // If the exchange is ACTIVE, the transaction boundary will suspended when the
672
// answer is sent
673
// Else just queue the exchange
674
else {
675             try {
676                 queue.put(me);
677             } catch (InterruptedException JavaDoc e) {
678                 log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
679                 me.getPacket().setAborted(true);
680             }
681         }
682     }
683     
684     protected MessageExchangeListener getExchangeListener() {
685         Component component = this.component.getComponent();
686         if (component instanceof MessageExchangeListener) {
687             return (MessageExchangeListener) component;
688         }
689         ComponentLifeCycle lifecycle = this.component.getLifeCycle();
690         if (lifecycle instanceof MessageExchangeListener) {
691             return (MessageExchangeListener) lifecycle;
692         }
693         return null;
694     }
695     
696     /**
697      * Synchronization must be performed on the given exchange when calling this method
698      *
699      * @param me
700      * @throws InterruptedException
701      */

702     protected void waitForExchange(MessageExchangeImpl me, Object JavaDoc lock, long timeout, String JavaDoc from) throws InterruptedException JavaDoc {
703         // If the channel is closed while here, we must abort
704
if (log.isDebugEnabled()) {
705             log.debug("Waiting for exchange " + me.getExchangeId() + " (" + Integer.toHexString(me.hashCode()) + ") to be answered in " + this + " from " + from);
706         }
707         Thread JavaDoc th = Thread.currentThread();
708         try {
709             waiters.put(th, Boolean.TRUE);
710             lock.wait(timeout);
711         } finally {
712             waiters.remove(th);
713         }
714         if (log.isDebugEnabled()) {
715             log.debug("Notified: " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from);
716         }
717     }
718     
719     protected void notifyExchange(MessageExchangeImpl me, Object JavaDoc lock, String JavaDoc from) {
720         if (log.isDebugEnabled()) {
721             log.debug("Notifying exchange " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from);
722         }
723         synchronized (lock) {
724             lock.notify();
725         }
726     }
727
728     /**
729      * Get Inbound Factory
730      *
731      * @return the inbound message factory
732      */

733     public MessageExchangeFactory getInboundFactory() {
734         if (inboundFactory == null) {
735             inboundFactory = createExchangeFactory();
736         }
737         return inboundFactory;
738     }
739
740     protected void suspendTx(MessageExchangeImpl me) {
741         try {
742             Transaction JavaDoc oldTx = me.getTransactionContext();
743             if (oldTx != null) {
744                 TransactionManager JavaDoc tm = (TransactionManager JavaDoc) container.getTransactionManager();
745                 if (tm != null) {
746                     if (log.isDebugEnabled()) {
747                         log.debug("Suspending transaction for " + me.getExchangeId() + " in " + this);
748                     }
749                     Transaction JavaDoc tx = tm.suspend();
750                     if (tx != oldTx) {
751                         throw new IllegalStateException JavaDoc("the transaction context set in the messageExchange is not bound to the current thread");
752                     }
753                 }
754             }
755         } catch (Exception JavaDoc e) {
756             log.info("Exchange " + me.getExchangeId() + " aborted due to transaction exception", e);
757             me.getPacket().setAborted(true);
758         }
759     }
760
761     protected void resumeTx(MessageExchangeImpl me) throws MessagingException {
762         try {
763             Transaction JavaDoc oldTx = me.getTransactionContext();
764             if (oldTx != null) {
765                 TransactionManager JavaDoc tm = (TransactionManager JavaDoc) container.getTransactionManager();
766                 if (tm != null) {
767                     if (log.isDebugEnabled()) {
768                         log.debug("Resuming transaction for " + me.getExchangeId() + " in " + this);
769                     }
770                     tm.resume(oldTx);
771                 }
772             }
773         } catch (Exception JavaDoc e) {
774             throw new MessagingException(e);
775         }
776     }
777
778     /**
779      * If the jbi container configured to do so, the message exchange will
780      * automatically be enlisted in the current transaction, if exists.
781      *
782      * @param messageExchange
783      * @throws MessagingException
784      */

785     protected void autoEnlistInTx(MessageExchangeImpl me) throws MessagingException {
786         try {
787             if (container.isAutoEnlistInTransaction()) {
788                 TransactionManager JavaDoc tm = (TransactionManager JavaDoc) container.getTransactionManager();
789                 if (tm != null) {
790                     Transaction JavaDoc tx = tm.getTransaction();
791                     if (tx != null) {
792                         Object JavaDoc oldTx = me.getTransactionContext();
793                         if (oldTx == null) {
794                             me.setTransactionContext(tx);
795                         } else if (oldTx != tx) {
796                             throw new IllegalStateException JavaDoc(
797                                     "the transaction context set in the messageExchange is not bound to the current thread");
798                         }
799                     }
800                 }
801             }
802         } catch (Exception JavaDoc e) {
803             throw new MessagingException(e);
804         }
805     }
806
807     /**
808      * @return pretty print
809      */

810     public String JavaDoc toString() {
811         return "DeliveryChannel{" + component.getName() + "}";
812     }
813     
814     private String JavaDoc getKeyForExchange(MessageExchangeImpl me) {
815         return (me.getRole() == Role.CONSUMER ? "consumer:" : "provider:") + me.getExchangeId();
816     }
817 }
818
Popular Tags