KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > transports > jms > JMSSessionFactory


1 package org.objectweb.celtix.bus.transports.jms;
2
3 import java.net.InetAddress JavaDoc;
4 import java.net.UnknownHostException JavaDoc;
5 import java.util.Calendar JavaDoc;
6 import java.util.logging.Level JavaDoc;
7 import java.util.logging.Logger JavaDoc;
8
9 import javax.jms.Connection JavaDoc;
10 import javax.jms.Destination JavaDoc;
11 import javax.jms.JMSException JavaDoc;
12 import javax.jms.MessageConsumer JavaDoc;
13 import javax.jms.Queue JavaDoc;
14 import javax.jms.QueueConnection JavaDoc;
15 import javax.jms.QueueSession JavaDoc;
16 import javax.jms.Session JavaDoc;
17 import javax.jms.Topic JavaDoc;
18 import javax.jms.TopicConnection JavaDoc;
19 import javax.jms.TopicSession JavaDoc;
20 import javax.jms.TopicSubscriber JavaDoc;
21 import javax.naming.Context JavaDoc;
22 import javax.naming.NamingException JavaDoc;
23
24 import org.objectweb.celtix.common.logging.LogUtils;
25 import org.objectweb.celtix.common.util.AbstractTwoStageCache;
26 import org.objectweb.celtix.transports.jms.JMSAddressPolicyType;
27 import org.objectweb.celtix.transports.jms.JMSServerBehaviorPolicyType;
28
29
30 /**
31  * This class encapsulates the creation and pooling logic for JMS Sessions.
32  * The usage patterns for sessions, producers & consumers are as follows ...
33  * <p>
34  * client-side: an invoking thread requires relatively short-term exclusive
35  * use of a session, an unidentified producer to send the request message,
36  * and in the point-to-point domain a consumer for the temporary ReplyTo
37  * destination to synchronously receive the reply if the operation is twoway
38  * (in the pub-sub domain only oneway operations are supported, so a there
39  * is never a requirement for a reply destination)
40  * <p>
41  * server-side receive: each port based on <jms:address> requires relatively
42  * long-term exclusive use of a session, a consumer with a MessageListener for
43  * the JMS destination specified for the port, and an unidentified producer
44  * to send the request message
45  * <p>
46  * server-side send: each dispatch of a twoway request requires relatively
47  * short-term exclusive use of a session and an indentified producer (but
48  * not a consumer) - note that the session used for the recieve side cannot
49  * be re-used for the send, as MessageListener usage precludes any synchronous
50  * sends or receives on that session
51  * <p>
52  * So on the client-side, pooling of sessions is bound up with pooling
53  * of temporary reply destinations, whereas on the server receive side
54  * the benefit of pooling is marginal as the session is required from
55  * the point at which the port was activated until the Bus is shutdown
56  * The server send side resembles the client side,
57  * except that a consumer for the temporary destination is never required.
58  * Hence different pooling strategies make sense ...
59  * <p>
60  * client-side: a SoftReference-based cache of send/receive sessions is
61  * maintained containing an aggregate of a session, indentified producer,
62  * temporary reply destination & consumer for same
63  * <p>
64  * server-side receive: as sessions cannot be usefully recycled, they are
65  * simply created on demand and closed when no longer required
66  * <p>
67  * server-side send: a SoftReference-based cache of send-only sessions is
68  * maintained containing an aggregate of a session and an indentified producer
69  * <p>
70  * In a pure client or pure server, only a single cache is ever
71  * populated. Where client and server logic is co-located, a client
72  * session retrieval for a twoway invocation checks the reply-capable
73  * cache first and then the send-only cache - if a session is
74  * available in the later then its used after a tempory destination is
75  * created before being recycled back into the reply-capable cache. A
76  * server send side retrieval or client retrieval for a oneway
77  * invocation checks the send-only cache first and then the
78  * reply-capable cache - if a session is available in the later then
79  * its used and the tempory destination is ignored. So in the
80  * co-located case, sessions migrate from the send-only cache to the
81  * reply-capable cache as necessary.
82  * <p>
83  *
84  * @author Eoghan Glynn
85  */

86 public class JMSSessionFactory {
87
88     private static final int CACHE_HIGH_WATER_MARK = 500;
89     private static final Logger JavaDoc LOG = LogUtils.getL7dLogger(JMSSessionFactory.class);
90     private static final int PRIMARY_CACHE_MAX = 20;
91
92     private final Context JavaDoc initialContext;
93     private final Connection JavaDoc theConnection;
94     private AbstractTwoStageCache<PooledSession> replyCapableSessionCache;
95     private AbstractTwoStageCache<PooledSession> sendOnlySessionCache;
96     private final Destination JavaDoc theReplyDestination;
97     private final boolean isQueueConnection;
98  
99     private final JMSAddressPolicyType addressExtensor;
100     private final JMSServerBehaviorPolicyType jmsServerPolicy;
101
102     /**
103      * Constructor.
104      *
105      * @param connection the shared {Queue|Topic}Connection
106      */

107     public JMSSessionFactory(Connection JavaDoc connection,
108                              Destination JavaDoc replyDestination,
109                              JMSAddressPolicyType addrExt,
110                              JMSServerBehaviorPolicyType serverPolicy,
111                              Context JavaDoc context) {
112         theConnection = connection;
113         theReplyDestination = replyDestination;
114         addressExtensor = addrExt;
115         isQueueConnection = addressExtensor.getDestinationStyle().value().equals(JMSConstants.JMS_QUEUE);
116         jmsServerPolicy = serverPolicy;
117         initialContext = context;
118
119         // create session caches (REVISIT sizes should be configurable)
120
//
121
if (isQueueConnection) {
122             // the reply capable cache is only required in the point-to-point
123
// domain
124
//
125
replyCapableSessionCache =
126                 new AbstractTwoStageCache<PooledSession>(
127                             PRIMARY_CACHE_MAX,
128                             CACHE_HIGH_WATER_MARK,
129                             0,
130                             this) {
131                     public final PooledSession create() throws JMSException JavaDoc {
132                         return createPointToPointReplyCapableSession();
133                     }
134                 };
135
136             try {
137                 replyCapableSessionCache.populateCache();
138             } catch (Throwable JavaDoc t) {
139                 LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
140             }
141
142             // send-only cache for point-to-point oneway requests and replies
143
//
144
sendOnlySessionCache =
145                 new AbstractTwoStageCache<PooledSession>(
146                             PRIMARY_CACHE_MAX,
147                             CACHE_HIGH_WATER_MARK,
148                             0,
149                             this) {
150                     public final PooledSession create() throws JMSException JavaDoc {
151                         return createPointToPointSendOnlySession();
152                     }
153                 };
154
155             try {
156                 sendOnlySessionCache.populateCache();
157             } catch (Throwable JavaDoc t) {
158                 LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
159             }
160         } else {
161             // send-only cache for pub-sub oneway requests
162
//
163
sendOnlySessionCache =
164                 new AbstractTwoStageCache<PooledSession>(
165                            PRIMARY_CACHE_MAX,
166                            CACHE_HIGH_WATER_MARK,
167                            0,
168                            this) {
169                     public final PooledSession create() throws JMSException JavaDoc {
170                         return createPubSubSession(true, false, null);
171                     }
172                 };
173
174             try {
175                 sendOnlySessionCache.populateCache();
176             } catch (Throwable JavaDoc t) {
177                 LOG.log(Level.FINE, "JMS Session cache populate failed: " + t);
178             }
179         }
180     }
181
182     //--java.lang.Object Overrides----------------------------------------------
183
public String JavaDoc toString() {
184         return "JMSSessionFactory";
185     }
186
187
188     //--Methods-----------------------------------------------------------------
189
protected Connection JavaDoc getConnection() {
190         return theConnection;
191     }
192
193     public Queue JavaDoc getQueueFromInitialContext(String JavaDoc queueName)
194         throws NamingException JavaDoc {
195         return (Queue JavaDoc) initialContext.lookup(queueName);
196     }
197
198     public PooledSession get(boolean replyCapable) throws JMSException JavaDoc {
199         return get(null, replyCapable);
200     }
201     
202     /**
203      * Retrieve a new or cached Session.
204      * @param replyDest Destination name if coming from wsa:Header
205      * @param replyCapable true iff the session is to be used to receive replies
206      * (implies client side twoway invocation )
207      * @return a new or cached Session
208      */

209     public PooledSession get(Destination JavaDoc replyDest, boolean replyCapable) throws JMSException JavaDoc {
210         PooledSession ret = null;
211
212         synchronized (this) {
213             if (replyCapable) {
214                 // first try reply capable cache
215
//
216
ret = replyCapableSessionCache.poll();
217
218                 if (ret == null) {
219                     // fall back to send only cache, creating temporary reply
220
// queue and consumer
221
//
222
ret = sendOnlySessionCache.poll();
223
224                     if (ret != null) {
225                         QueueSession JavaDoc session = (QueueSession JavaDoc)ret.session();
226                         Queue JavaDoc destination = null;
227                         String JavaDoc selector = null;
228                         
229                         if (null != theReplyDestination || null != replyDest) {
230                             destination = null != replyDest ? (Queue JavaDoc) replyDest : (Queue JavaDoc)theReplyDestination;
231                             
232                             selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'";
233                         }
234                         
235                         ret.destination(destination);
236                         MessageConsumer JavaDoc consumer = session.createReceiver(destination, selector);
237                         ret.consumer(consumer);
238                     } else {
239                         // no pooled session available in either cache => create one in
240
// in the reply capable cache
241
//
242
try {
243                             ret = replyCapableSessionCache.get();
244                         } catch (Throwable JavaDoc t) {
245                             // factory method may only throw JMSException
246
//
247
throw (JMSException JavaDoc)t;
248                         }
249                     }
250                 }
251             } else {
252                 // first try send only cache
253
//
254
ret = sendOnlySessionCache.poll();
255
256                 if (ret == null) {
257                     // fall back to reply capable cache if one exists (only in the
258
// point-to-point domain), ignoring temporary reply destination
259
// and consumer
260
//
261
if (replyCapableSessionCache != null) {
262                         ret = replyCapableSessionCache.poll();
263                     }
264
265                     if (ret == null) {
266                         // no pooled session available in either cache => create one in
267
// in the send only cache
268
//
269
try {
270                             ret = sendOnlySessionCache.get();
271                         } catch (Throwable JavaDoc t) {
272                             // factory method may only throw JMSException
273
//
274
throw (JMSException JavaDoc)t;
275                         }
276                     }
277                 }
278             }
279         }
280
281         return ret;
282     }
283
284     /**
285      * Retrieve a new
286      *
287      * @param destination the target JMS queue or topic (non-null implies
288      * server receive side)
289      * @return a new or cached Session
290      */

291     public PooledSession get(Destination JavaDoc destination) throws JMSException JavaDoc {
292         PooledSession ret = null;
293
294         // the destination is only specified on the server receive side,
295
// in which case a new session is always created
296
//
297
if (isQueueConnection) {
298             ret = createPointToPointServerSession(destination);
299         } else {
300             ret = createPubSubSession(false, true, destination);
301         }
302
303         return ret;
304     }
305
306     /**
307      * Return a Session to the pool
308      *
309      * @param pooled_session the session to recycle
310      */

311     public void recycle(PooledSession pooledSession) {
312         // sessions used long-term by the server receive side are not cached,
313
// only non-null destinations are temp queues
314
final boolean replyCapable = pooledSession.destination() != null;
315         boolean discard = false;
316
317         synchronized (this) {
318             // re-cache session, closing if it cannot be it can be accomodated
319
//
320
discard = replyCapable ? (!replyCapableSessionCache.recycle(pooledSession))
321                 : (!sendOnlySessionCache.recycle(pooledSession));
322         }
323
324         if (discard) {
325             try {
326                 pooledSession.close();
327             } catch (JMSException JavaDoc e) {
328                 LOG.log(Level.WARNING, "JMS Session discard failed: " + e);
329             }
330         }
331     }
332
333
334     /**
335      * Shutdown the session factory.
336      */

337     public void shutdown() {
338         try {
339             PooledSession curr;
340
341             if (replyCapableSessionCache != null) {
342                 curr = replyCapableSessionCache.poll();
343                 while (curr != null) {
344                     curr.close();
345                     curr = replyCapableSessionCache.poll();
346                 }
347             }
348
349             if (sendOnlySessionCache != null) {
350                 curr = sendOnlySessionCache.poll();
351                 while (curr != null) {
352                     curr.close();
353                     curr = sendOnlySessionCache.poll();
354                 }
355             }
356
357             theConnection.close();
358         } catch (JMSException JavaDoc e) {
359             LOG.log(Level.WARNING, "queue connection close failed: " + e);
360         }
361
362         // help GC
363
//
364
replyCapableSessionCache = null;
365         sendOnlySessionCache = null;
366     }
367
368
369     /**
370      * Helper method to create a point-to-point pooled session.
371      *
372      * @param producer true iff producing
373      * @param consumer true iff consuming
374      * @param destination the target destination
375      * @return an appropriate pooled session
376      */

377     PooledSession createPointToPointReplyCapableSession() throws JMSException JavaDoc {
378         QueueSession JavaDoc session =
379             ((QueueConnection JavaDoc)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
380         Destination JavaDoc destination = null;
381         String JavaDoc selector = null;
382         
383         if (null != theReplyDestination) {
384             destination = theReplyDestination;
385             
386             selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'";
387             
388             
389         } else {
390             destination = session.createTemporaryQueue();
391         }
392         
393         MessageConsumer JavaDoc consumer = session.createReceiver((Queue JavaDoc)destination, selector);
394         return new PooledSession(session,
395                                  destination,
396                                  session.createSender(null),
397                                  consumer);
398     }
399
400
401     /**
402      * Helper method to create a point-to-point pooled session.
403      *
404      * @return an appropriate pooled session
405      */

406     PooledSession createPointToPointSendOnlySession() throws JMSException JavaDoc {
407         QueueSession JavaDoc session =
408             ((QueueConnection JavaDoc)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
409
410         return new PooledSession(session, null, session.createSender(null), null);
411     }
412
413
414     /**
415      * Helper method to create a point-to-point pooled session for consumer only.
416      *
417      * @param destination the target destination
418      * @return an appropriate pooled session
419      */

420     private PooledSession createPointToPointServerSession(Destination JavaDoc destination) throws JMSException JavaDoc {
421         QueueSession JavaDoc session =
422             ((QueueConnection JavaDoc)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
423
424
425         return new PooledSession(session, destination, session.createSender(null),
426                                  session.createReceiver((Queue JavaDoc)destination,
427                                  jmsServerPolicy.getMessageSelector()));
428     }
429
430
431     /**
432      * Helper method to create a pub-sub pooled session.
433      *
434      * @param producer true iff producing
435      * @param consumer true iff consuming
436      * @param destination the target destination
437      * @return an appropriate pooled session
438      */

439     PooledSession createPubSubSession(boolean producer,
440                                               boolean consumer,
441                                               Destination JavaDoc destination) throws JMSException JavaDoc {
442         TopicSession JavaDoc session = ((TopicConnection JavaDoc)theConnection).createTopicSession(false,
443                                                                                    Session.AUTO_ACKNOWLEDGE);
444         TopicSubscriber JavaDoc sub = null;
445         if (consumer) {
446             String JavaDoc messageSelector = jmsServerPolicy.getMessageSelector();
447             String JavaDoc durableName = jmsServerPolicy.getDurableSubscriberName();
448             if (durableName != null) {
449                 sub = session.createDurableSubscriber((Topic JavaDoc)destination,
450                                                       durableName,
451                                                       messageSelector,
452                                                       false);
453             } else {
454                 sub = session.createSubscriber((Topic JavaDoc)destination,
455                                                messageSelector,
456                                                false);
457             }
458         }
459
460         return new PooledSession(session,
461                                  null,
462                                  producer ? session.createPublisher(null) : null,
463                                  sub);
464     }
465     
466     private String JavaDoc generateUniqueSelector(Object JavaDoc obj) {
467         String JavaDoc host = "localhost";
468
469         try {
470             InetAddress JavaDoc addr = InetAddress.getLocalHost();
471             host = addr.getHostName();
472         } catch (UnknownHostException JavaDoc ukex) {
473             //Default to localhost.
474
}
475
476         long time = Calendar.getInstance().getTimeInMillis();
477         return host + "_"
478             + System.getProperty("user.name") + "_"
479             + obj + time;
480     }
481 }
482
Popular Tags