KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > components > util > OutBinding


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.components.util;
18
19 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.servicemix.MessageExchangeListener;
24
25 import javax.jbi.JBIException;
26 import javax.jbi.messaging.DeliveryChannel;
27 import javax.jbi.messaging.ExchangeStatus;
28 import javax.jbi.messaging.MessageExchange;
29 import javax.jbi.messaging.MessagingException;
30 import javax.jbi.messaging.NormalizedMessage;
31
32 /**
33  * A base class for bindings which process inbound JBI messages
34  *
35  * @version $Revision: 426415 $
36  */

37 public abstract class OutBinding extends ComponentSupport implements Runnable JavaDoc, MessageExchangeListener {
38     private static final Log log = LogFactory.getLog(OutBinding.class);
39     private AtomicBoolean stop = new AtomicBoolean(false);
40     private Thread JavaDoc runnable;
41
42     public OutBinding() {
43     }
44
45     public void onMessageExchange(MessageExchange exchange) throws MessagingException {
46         if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
47             try {
48                 NormalizedMessage message = getInMessage(exchange);
49                 process(exchange, message);
50             }
51             catch (Exception JavaDoc e) {
52                 if (log.isDebugEnabled()) {
53                     log.debug("Exchange failed", e);
54                 }
55                 fail(exchange, e);
56             }
57         }
58     }
59
60     /**
61      * Runnable implementation
62      */

63     public void run() {
64         try {
65             DeliveryChannel deliveryChannel = getDeliveryChannel();
66             while (!stop.get()) {
67                 MessageExchange exchange = deliveryChannel.accept();
68                 if (exchange != null) {
69                     try {
70                         onMessageExchange(exchange);
71                     } catch (MessagingException e) {
72                         log.error("MessageExchange processing failed", e);
73                     }
74                 }
75             }
76         }
77         catch (MessagingException e) {
78             log.error("run failed", e);
79         }
80     }
81
82     /**
83      * shutdown
84      *
85      * @throws JBIException
86      */

87     public void shutDown() throws JBIException {
88     }
89
90     /**
91      * stop
92      *
93      * @throws JBIException
94      */

95     public void stop() throws JBIException {
96         stop.compareAndSet(true, false);
97         if (runnable != null) {
98             runnable.interrupt();
99             try {
100                 runnable.join();
101             } catch (InterruptedException JavaDoc e) {
102                 log.warn("Unable to stop component polling thread", e);
103             }
104             runnable = null;
105         }
106     }
107
108     /**
109      * start
110      */

111     public void start() throws JBIException {
112         if (stop.compareAndSet(false, true)) {
113             runnable = new Thread JavaDoc(this);
114             runnable.setDaemon(true);
115             runnable.start();
116         }
117     }
118
119     /**
120      * Process incoming exchange.
121      * The exchange is in the ACTIVE state.
122      * The method should end by a call to done() or answer().
123      * When an exception is thrown, the fail() method will be called.
124      *
125      * @param messageExchange the exchange to process
126      * @param message the input message of the exchange
127      * @throws Exception if an error occurs
128      */

129     protected abstract void process(MessageExchange messageExchange, NormalizedMessage message) throws Exception JavaDoc;
130 }
131
Popular Tags