KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > transports > jms > JMSClientTransport


1 package org.objectweb.celtix.bus.transports.jms;
2
3 import java.io.ByteArrayInputStream JavaDoc;
4 import java.io.ByteArrayOutputStream JavaDoc;
5 import java.io.IOException JavaDoc;
6 import java.util.concurrent.Executor JavaDoc;
7 import java.util.concurrent.Future JavaDoc;
8 import java.util.logging.Level JavaDoc;
9 import java.util.logging.Logger JavaDoc;
10
11 import javax.jms.Destination JavaDoc;
12 import javax.jms.JMSException JavaDoc;
13 import javax.jms.Message JavaDoc;
14 import javax.jms.Queue JavaDoc;
15 import javax.jms.QueueSender JavaDoc;
16 import javax.jms.TextMessage JavaDoc;
17 import javax.jms.Topic JavaDoc;
18 import javax.jms.TopicPublisher JavaDoc;
19 import javax.naming.NamingException JavaDoc;
20 import javax.wsdl.Port;
21 import javax.wsdl.WSDLException;
22 import javax.xml.ws.handler.MessageContext;
23
24 import org.objectweb.celtix.Bus;
25 import org.objectweb.celtix.bindings.ClientBinding;
26 import org.objectweb.celtix.bindings.ResponseCallback;
27 import org.objectweb.celtix.bus.management.counters.TransportClientCounters;
28 import org.objectweb.celtix.common.logging.LogUtils;
29 import org.objectweb.celtix.configuration.Configuration;
30
31 import org.objectweb.celtix.context.InputStreamMessageContext;
32 import org.objectweb.celtix.context.OutputStreamMessageContext;
33 import org.objectweb.celtix.transports.ClientTransport;
34 import org.objectweb.celtix.transports.jms.JMSClientBehaviorPolicyType;
35 import org.objectweb.celtix.transports.jms.context.JMSMessageHeadersType;
36 import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
37 import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
38
39
40
41 public class JMSClientTransport extends JMSTransportBase implements ClientTransport {
42     
43     private static final Logger JavaDoc LOG = LogUtils.getL7dLogger(JMSClientTransport.class);
44     private static final long DEFAULT_RECEIVE_TIMEOUT = 0;
45     
46     protected boolean textPayload;
47     TransportClientCounters counters;
48     private JMSClientBehaviorPolicyType clientBehaviourPolicy;
49     private ResponseCallback responseCallback;
50     
51     public JMSClientTransport(Bus bus,
52                               EndpointReferenceType address,
53                               ClientBinding binding)
54         throws WSDLException, IOException JavaDoc {
55
56         super(bus, address, false);
57         clientBehaviourPolicy = getClientPolicy(configuration);
58         counters = new TransportClientCounters("JMSClientTransport");
59         
60         EndpointReferenceUtils.setAddress(address, getAddrUriFromJMSAddrPolicy());
61         targetEndpoint = address;
62         
63         textPayload =
64             JMSConstants.TEXT_MESSAGE_TYPE.equals(clientBehaviourPolicy.getMessageType().value());
65         
66         LOG.log(Level.FINE, "TEXT_MESSAGE_TYPE: " , textPayload);
67         LOG.log(Level.FINE, "QUEUE_DESTINATION_STYLE: " , queueDestinationStyle);
68         if (binding != null) {
69             responseCallback = binding.createResponseCallback();
70         }
71         entry("JMSClientTransport Constructor");
72     }
73
74     private JMSClientBehaviorPolicyType getClientPolicy(Configuration conf) {
75         JMSClientBehaviorPolicyType pol = conf.getObject(JMSClientBehaviorPolicyType.class, "jmsClient");
76         if (pol == null) {
77             pol = new JMSClientBehaviorPolicyType();
78         }
79         return pol;
80     }
81     
82     public JMSClientBehaviorPolicyType getJMSClientBehaviourPolicy() {
83         
84         return clientBehaviourPolicy;
85     }
86     
87     //TODO: Revisit for proper implementation and changes if any.
88

89     public void shutdown() {
90         entry("JMSClientTransport shutdown()");
91
92         // ensure resources held by session factory are released
93
//
94
if (sessionFactory != null) {
95             sessionFactory.shutdown();
96         }
97     }
98     
99     public EndpointReferenceType getTargetEndpoint() {
100         return targetEndpoint;
101     }
102     
103     public EndpointReferenceType getDecoupledEndpoint() throws IOException JavaDoc {
104         
105         if (jmsAddressPolicy.getJndiReplyDestinationName() != null) {
106             EndpointReferenceType epr = new EndpointReferenceType();
107             EndpointReferenceUtils.setAddress(epr, getReplyTotAddrUriFromJMSAddrPolicy());
108             return epr;
109         }
110         
111         return null;
112     }
113     
114     public Port getPort() {
115         return port;
116     }
117     
118     public OutputStreamMessageContext createOutputStreamContext(MessageContext context) throws IOException JavaDoc {
119         return new JMSOutputStreamContext(context);
120     }
121
122
123     public void finalPrepareOutputStreamContext(OutputStreamMessageContext context) throws IOException JavaDoc {
124     }
125
126     public InputStreamMessageContext invoke(OutputStreamMessageContext context)
127         throws IOException JavaDoc {
128         
129         if (!queueDestinationStyle) {
130             LOG.log(Level.WARNING, "Non-oneway invocations not supported for JMS Topics");
131             throw new IOException JavaDoc("Non-oneway invocations not supported for JMS Topics");
132         }
133
134         try {
135             byte[] responseData = null;
136             if (textPayload) {
137                 String JavaDoc responseString = (String JavaDoc)invoke(context, true);
138                 responseData = responseString.getBytes();
139             } else {
140                 responseData = (byte[])invoke(context, true);
141             }
142             counters.getInvoke().increase();
143             JMSInputStreamContext respContext =
144                 new JMSInputStreamContext(new ByteArrayInputStream JavaDoc(responseData));
145             
146             if (context.containsKey(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS)) {
147                 JMSMessageHeadersType responseHdr =
148                     (JMSMessageHeadersType)context.remove(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
149                 respContext.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, responseHdr);
150                 respContext.setScope(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS,
151                                      MessageContext.Scope.APPLICATION);
152             }
153                         
154             return respContext;
155         } catch (Exception JavaDoc ex) {
156             //TODO: decide what to do with the exception.
157
counters.getInvokeError().increase();
158             throw new IOException JavaDoc(ex.getMessage());
159         }
160     }
161
162
163     /**
164      * Variant on invoke used for oneways.
165      *
166      * @param request the buffer to send
167      */

168     public void invokeOneway(OutputStreamMessageContext context) throws IOException JavaDoc {
169         try {
170             invoke(context, false);
171             counters.getInvokeOneWay();
172         } catch (Exception JavaDoc ex) {
173             counters.getInvokeError().increase();
174             throw new IOException JavaDoc(ex.getMessage());
175         }
176     }
177
178     public Future JavaDoc<InputStreamMessageContext> invokeAsync(OutputStreamMessageContext context,
179                                                          Executor JavaDoc executor)
180         throws IOException JavaDoc {
181         return null;
182     }
183
184     public ResponseCallback getResponseCallback() {
185         return responseCallback;
186     }
187
188     /**
189      * Internal invoke mechanics.
190      *
191      * @param request the buffer to send
192      * @param responseExpected true iff a response is expected
193      * @return the response buffer if expected
194      */

195     private Object JavaDoc invoke(OutputStreamMessageContext context, boolean responseExpected)
196         throws JMSException JavaDoc, NamingException JavaDoc {
197         entry("JMSClientTransport invoke()");
198
199         try {
200             if (null == sessionFactory) {
201                 JMSProviderHub.connect(this);
202             }
203         } catch (JMSException JavaDoc ex) {
204             LOG.log(Level.FINE, "JMS connect failed with JMSException : ", ex);
205             throw ex;
206         } catch (NamingException JavaDoc e) {
207             LOG.log(Level.FINE, "JMS connect failed with NamingException : ", e);
208             throw e;
209         }
210
211         if (sessionFactory == null) {
212             throw new java.lang.IllegalStateException JavaDoc("JMSClientTransport not connected");
213         }
214
215         PooledSession pooledSession = sessionFactory.get(responseExpected);
216         send(pooledSession, context, responseExpected);
217
218         Object JavaDoc response = null;
219
220         if (responseExpected) {
221             response = receive(pooledSession, context);
222         }
223
224         sessionFactory.recycle(pooledSession);
225
226         return response;
227     }
228
229
230     /**
231      * Send mechanics.
232      *
233      * @param request the request buffer
234      * @param pooledSession the shared JMS resources
235      */

236     private void send(PooledSession pooledSession,
237                               OutputStreamMessageContext context,
238                               boolean responseExpected)
239         throws JMSException JavaDoc {
240         Object JavaDoc request;
241
242         if (textPayload) {
243             request = context.getOutputStream().toString();
244         } else {
245             request = ((ByteArrayOutputStream JavaDoc)context.getOutputStream()).toByteArray();
246         }
247
248         Destination JavaDoc replyTo = pooledSession.destination();
249
250         //We don't want to send temp queue in
251
//replyTo header for oneway calls
252
if (!responseExpected
253             && (jmsAddressPolicy.getJndiReplyDestinationName() == null)) {
254             replyTo = null;
255         }
256
257         Message JavaDoc message = marshal(request, pooledSession.session(), replyTo,
258                                   clientBehaviourPolicy.getMessageType().value());
259       // message.get
260

261         JMSMessageHeadersType headers =
262             (JMSMessageHeadersType)context.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
263
264
265         int deliveryMode = getJMSDeliveryMode(headers);
266         int priority = getJMSPriority(headers);
267         String JavaDoc correlationID = getCorrelationId(headers);
268         long ttl = getTimeToLive(headers);
269         if (ttl <= 0) {
270             ttl = DEFAULT_RECEIVE_TIMEOUT;
271         }
272         
273         setMessageProperties(headers, message);
274         if (responseExpected) {
275             String JavaDoc id = pooledSession.getCorrelationID();
276
277             if (id != null) {
278                 if (correlationID != null) {
279                     String JavaDoc error = "User cannot set JMSCorrelationID when "
280                         + "making a request/reply invocation using "
281                         + "a static replyTo Queue.";
282                     throw new JMSException JavaDoc(error);
283                 }
284                 correlationID = id;
285             }
286         }
287
288         if (correlationID != null) {
289             message.setJMSCorrelationID(correlationID);
290         } else {
291             //No message correlation id is set. Whatever comeback will be accepted as responses.
292
// We assume that it will only happen in case of the temp. reply queue.
293
}
294
295         LOG.log(Level.FINE, "client sending request: ", message);
296
297         if (queueDestinationStyle) {
298             QueueSender JavaDoc sender = (QueueSender JavaDoc)pooledSession.producer();
299             sender.setTimeToLive(ttl);
300             sender.send((Queue JavaDoc)targetDestination, message, deliveryMode, priority, ttl);
301         } else {
302             TopicPublisher JavaDoc publisher = (TopicPublisher JavaDoc)pooledSession.producer();
303             publisher.setTimeToLive(ttl);
304             publisher.publish((Topic JavaDoc)targetDestination, message, deliveryMode, priority, ttl);
305         }
306     }
307
308
309     /**
310      * Receive mechanics.
311      *
312      * @param pooledSession the shared JMS resources
313      * @retrun the response buffer
314      */

315     private Object JavaDoc receive(PooledSession pooledSession,
316                            OutputStreamMessageContext context)
317         throws JMSException JavaDoc {
318         Object JavaDoc response = null;
319         
320         long timeout = DEFAULT_RECEIVE_TIMEOUT;
321
322         Long JavaDoc receiveTimeout = (Long JavaDoc)context.get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
323
324         if (receiveTimeout != null) {
325             timeout = receiveTimeout.longValue();
326         }
327         
328         Message JavaDoc message = pooledSession.consumer().receive(timeout);
329         LOG.log(Level.FINE, "client received reply: " , message);
330
331         if (message != null) {
332             
333             populateIncomingContext(message, context, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
334             String JavaDoc messageType = message instanceof TextMessage JavaDoc
335                         ? JMSConstants.TEXT_MESSAGE_TYPE : JMSConstants.BINARY_MESSAGE_TYPE;
336             response = unmarshal(message, messageType);
337             return response;
338         } else {
339             String JavaDoc error = "JMSClientTransport.receive() timed out. No message available.";
340             LOG.log(Level.SEVERE, error);
341             //TODO: Review what exception should we throw.
342
//throw new JMSException(error);
343
return null;
344         }
345     }
346 }
347
Popular Tags