KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jms > jca > JcaConsumerProcessor


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jms.jca;
18
19 import java.util.Map JavaDoc;
20
21 import javax.jbi.messaging.DeliveryChannel;
22 import javax.jbi.messaging.ExchangeStatus;
23 import javax.jbi.messaging.InOnly;
24 import javax.jbi.messaging.MessageExchange;
25 import javax.jms.Connection JavaDoc;
26 import javax.jms.ConnectionFactory JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.resource.spi.ActivationSpec JavaDoc;
32 import javax.resource.spi.BootstrapContext JavaDoc;
33 import javax.resource.spi.ResourceAdapter JavaDoc;
34 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
35 import javax.transaction.TransactionManager JavaDoc;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.servicemix.common.AsyncBaseLifeCycle;
40 import org.apache.servicemix.common.BaseLifeCycle;
41 import org.apache.servicemix.jms.AbstractJmsProcessor;
42 import org.apache.servicemix.jms.JmsEndpoint;
43 import org.apache.servicemix.soap.Context;
44 import org.jencks.SingletonEndpointFactory;
45
46 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
47
48 /**
49  *
50  * @author <a HREF="mailto:gnodet [at] gmail.com">Guillaume Nodet</a>
51  */

52 public class JcaConsumerProcessor extends AbstractJmsProcessor implements MessageListener JavaDoc {
53
54     private static final Log log = LogFactory.getLog(JcaConsumerProcessor.class);
55
56     protected Map JavaDoc pendingMessages = new ConcurrentHashMap();
57     protected DeliveryChannel channel;
58     protected ResourceAdapter JavaDoc resourceAdapter;
59     protected MessageEndpointFactory JavaDoc endpointFactory;
60     protected ActivationSpec JavaDoc activationSpec;
61     protected BootstrapContext JavaDoc bootstrapContext;
62     protected TransactionManager JavaDoc transactionManager;
63     protected ConnectionFactory JavaDoc connectionFactory;
64     
65     public JcaConsumerProcessor(JmsEndpoint endpoint) {
66         super(endpoint);
67     }
68
69     public void start() throws Exception JavaDoc {
70         AsyncBaseLifeCycle lf = (AsyncBaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
71         channel = lf.getContext().getDeliveryChannel();
72         transactionManager = (TransactionManager JavaDoc) lf.getContext().getTransactionManager();
73         endpointFactory = new SingletonEndpointFactory(this, transactionManager);
74         bootstrapContext = endpoint.getBootstrapContext();
75         if (bootstrapContext == null) {
76             throw new IllegalArgumentException JavaDoc("bootstrapContext not set");
77         }
78         connectionFactory = endpoint.getConnectionFactory();
79         if (connectionFactory == null) {
80             throw new IllegalArgumentException JavaDoc("connectionFactory not set");
81         }
82         activationSpec = endpoint.getActivationSpec();
83         if (activationSpec == null) {
84             throw new IllegalArgumentException JavaDoc("activationSpec not set");
85         }
86         resourceAdapter = endpoint.getResourceAdapter();
87         if (resourceAdapter == null) {
88             resourceAdapter = activationSpec.getResourceAdapter();
89         } else if (activationSpec.getResourceAdapter() == null) {
90             activationSpec.setResourceAdapter(resourceAdapter);
91         } else {
92             throw new IllegalArgumentException JavaDoc("resourceAdapter not set");
93         }
94         resourceAdapter.start(bootstrapContext);
95         resourceAdapter.endpointActivation(endpointFactory, activationSpec);
96     }
97
98     public void stop() throws Exception JavaDoc {
99         resourceAdapter.endpointDeactivation(endpointFactory, activationSpec);
100         pendingMessages.clear();
101     }
102
103     public void onMessage(final Message JavaDoc message) {
104         try {
105             if (log.isDebugEnabled()) {
106                 log.debug("Received jms message " + message);
107             }
108             Context JavaDoc context = createContext();
109             MessageExchange exchange = toNMS(message, context);
110             if (exchange instanceof InOnly == false) {
111                 throw new UnsupportedOperationException JavaDoc("JCA consumer endpoints can only use InOnly MEP");
112             }
113             exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, transactionManager.getTransaction());
114             pendingMessages.put(exchange.getExchangeId(), context);
115             if (endpoint.isSynchronous()) {
116                 channel.sendSync(exchange);
117                 process(exchange);
118             } else {
119                 BaseLifeCycle lf = (BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle();
120                 lf.sendConsumerExchange(exchange, JcaConsumerProcessor.this.endpoint);
121             }
122         } catch (Throwable JavaDoc e) {
123             log.error("Error while handling jms message", e);
124         }
125     }
126
127     public void process(MessageExchange exchange) throws Exception JavaDoc {
128         Context JavaDoc context = (Context JavaDoc) pendingMessages.remove(exchange.getExchangeId());
129         Message JavaDoc message = (Message JavaDoc) context.getProperty(Message JavaDoc.class.getName());
130         Message JavaDoc response = null;
131         Connection JavaDoc connection = null;
132         try {
133             if (exchange.getStatus() == ExchangeStatus.DONE) {
134                 return;
135             }
136             connection = connectionFactory.createConnection();
137             Session JavaDoc session = connection.createSession(true, Session.SESSION_TRANSACTED);
138             response = fromNMSResponse(exchange, context, session);
139             if (response != null) {
140                 MessageProducer JavaDoc producer = session.createProducer(message.getJMSReplyTo());
141                 response.setJMSCorrelationID(message.getJMSCorrelationID());
142                 producer.send(response);
143             }
144         } finally {
145             if (connection != null) {
146                 connection.close();
147             }
148             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
149                 exchange.setStatus(ExchangeStatus.DONE);
150                 channel.send(exchange);
151             }
152         }
153     }
154
155 }
156
Popular Tags