KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > impl > Session


1 package com.ubermq.jms.client.impl;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.jms.client.*;
5 import com.ubermq.jms.client.msg.*;
6 import com.ubermq.jms.common.datagram.*;
7 import com.ubermq.kernel.*;
8 import java.util.*;
9 import javax.jms.*;
10
11 import javax.jms.Queue JavaDoc;
12
13 /**
14  * The session base class manages resources shared across a session including
15  * the thread used for delivering messages, message creation and other informational
16  * points.
17  *
18  */

19 class Session
20     implements javax.jms.Session JavaDoc
21 {
22     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(Session.class);
23     
24     private static final int SESSION_BUFFER_SIZE =
25         Integer.valueOf(Configurator.getProperty(ClientConfig.SESSION_BOUNDED_BUFFER_SIZE, "100")).intValue();
26
27     private static final String JavaDoc TEMP_TOPIC_PREFIX = "$TT-";
28     private static final String JavaDoc TEMP_QUEUE_PREFIX = "$TEMPQ-";
29
30     static final boolean DEFAULT_NO_LOCAL = false;
31
32     /**
33      * The Connection that is parent of this session.
34      */

35     Connection conn;
36
37     /**
38      * the message datagram factory
39      */

40     final IMessageDatagramFactory factory;
41
42     /**
43      * the session global message listener. it is not used, and the intention
44      * is not specified by JMS.
45      */

46     private MessageListener listener;
47
48     /**
49      * The JMS acknowledgement mode.
50      */

51     final int ackMode;
52
53     /**
54      * the list of all consumers for this session.
55      */

56     private List consumers;
57
58     // whether the session is going away
59
private boolean isClosing = false;
60
61     // whether we need an async delivery thread
62
private boolean needsAsyncDelivery = false;
63
64     // async consumer delivery queue
65
private Channel deliveryQueue;
66     private PooledExecutor executor;
67
68     /**
69      * Creates a session with a message datagram factory and ack mode.
70      * @param f a message factory
71      * @param ackMode a JMS acknowledgement mode
72      */

73     Session(Connection conn, IMessageDatagramFactory f, int ackMode)
74     {
75         this.conn = conn;
76         this.factory = f;
77         this.ackMode = ackMode;
78         this.consumers = new ArrayList();
79
80         deliveryQueue = new BoundedPriorityQueue(SESSION_BUFFER_SIZE,
81                                                  new Comparator() {
82                     public int compare(Object JavaDoc o1, Object JavaDoc o2)
83                     {
84                         DeliveryTask dt1 = (DeliveryTask)o1,
85                             dt2 = (DeliveryTask)o2;
86
87                         return ((LocalMessage)dt1.msg).compareTo(dt2.msg);
88                     }
89                 });
90     }
91
92     /**
93      * Adds a consumer to this session.
94      *
95      * @param c an AbstractConsumer
96      */

97     void addConsumer(AbstractConsumer c)
98     {
99         consumers.add(c);
100     }
101
102     /**
103      * Removes a consumer from this session.
104      *
105      * @param c an AbstractConsumer, added previously
106      * with <code>addConsumer</code>.
107      */

108     void removeConsumer(AbstractConsumer c)
109     {
110         consumers.remove(c);
111     }
112
113     // UTILITY
114

115     private String JavaDoc createRandomIdentifier()
116     {
117         return com.ubermq.util.Utility.allocateLocallyUniqueInt() + "-" + new Random().nextInt();
118     }
119
120     // TOPICS
121

122     public TopicSubscriber createDurableSubscriber(Topic t, String JavaDoc name, String JavaDoc selector, boolean noLocal)
123         throws JMSException
124     {
125         if (isClosing)
126             throw new javax.jms.IllegalStateException JavaDoc("closed");
127
128         return new LocalTopicSubscriber(t,
129                                         selector,
130                                         noLocal,
131                                         name,
132                                         this,
133                                             (selector != null) ? new NullDeliveryManager() : conn.delivery.newInstance());
134     }
135
136     public Topic createTopic(String JavaDoc p0) throws JMSException
137     {
138         if (isClosing)
139             throw new javax.jms.IllegalStateException JavaDoc("closed");
140
141         return new LocalTopic(p0);
142     }
143
144     public TopicSubscriber createDurableSubscriber(Topic t, String JavaDoc name) throws JMSException
145     {
146         if (isClosing)
147             throw new javax.jms.IllegalStateException JavaDoc("closed");
148
149         return createDurableSubscriber(t, name, null, DEFAULT_NO_LOCAL);
150     }
151
152     public void unsubscribe(String JavaDoc name) throws JMSException
153     {
154         if (isClosing)
155             throw new javax.jms.IllegalStateException JavaDoc("closed");
156
157         conn.getClientProcessor().unregisterDurableSubscription(name);
158     }
159
160     public TemporaryTopic createTemporaryTopic() throws JMSException
161     {
162         if (isClosing)
163             throw new javax.jms.IllegalStateException JavaDoc("closed");
164
165         return new LocalTopic(TEMP_TOPIC_PREFIX + createRandomIdentifier());
166     }
167
168     // QUEUES
169

170     public TemporaryQueue createTemporaryQueue() throws JMSException
171     {
172         if (isClosing)
173             throw new javax.jms.IllegalStateException JavaDoc("closed");
174
175         return new LocalQueue(this, TEMP_QUEUE_PREFIX + createRandomIdentifier());
176     }
177
178     public QueueBrowser createBrowser(Queue JavaDoc q) throws JMSException
179     {
180         if (isClosing)
181             throw new javax.jms.IllegalStateException JavaDoc("closed");
182
183         return createBrowser(q, null);
184     }
185
186     public QueueBrowser createBrowser(Queue JavaDoc q, String JavaDoc selector) throws JMSException
187     {
188         if (isClosing)
189             throw new javax.jms.IllegalStateException JavaDoc("closed");
190
191         throw new UnsupportedOperationException JavaDoc();
192     }
193
194     public Queue JavaDoc createQueue(String JavaDoc name) throws JMSException
195     {
196         if (isClosing)
197             throw new javax.jms.IllegalStateException JavaDoc("closed");
198
199         return new LocalQueue(this, name);
200     }
201
202     // GENERICS
203

204     public int getAcknowledgeMode() throws JMSException
205     {
206         return ackMode;
207     }
208
209     public MessageConsumer createConsumer(Destination p0) throws JMSException
210     {
211         if (isClosing)
212             throw new javax.jms.IllegalStateException JavaDoc("closed");
213
214         return createConsumer(p0, null, DEFAULT_NO_LOCAL);
215     }
216
217     public MessageConsumer createConsumer(Destination d, String JavaDoc selector) throws JMSException
218     {
219         if (isClosing)
220             throw new javax.jms.IllegalStateException JavaDoc("closed");
221
222         return createConsumer(d, selector, DEFAULT_NO_LOCAL);
223     }
224
225     public MessageConsumer createConsumer(Destination d, String JavaDoc selector, boolean noLocal)
226         throws JMSException
227     {
228         if (isClosing)
229             throw new javax.jms.IllegalStateException JavaDoc("closed");
230
231         AbstractConsumer c;
232
233         if (d instanceof Queue JavaDoc)
234             c = new QueueReceiver((Queue JavaDoc)d, selector, this);
235         else if (d instanceof Topic)
236         {
237             c = new LocalTopicSubscriber((Topic)d,
238                                          selector,
239                                          noLocal,
240                                          this,
241                                              (selector != null) ? new NullDeliveryManager() : conn.delivery.newInstance());
242         }
243         else
244             throw new InvalidDestinationException("destination must be a topic or queue");
245
246         return c;
247     }
248
249     public MessageProducer createProducer(Destination p0)
250         throws JMSException
251     {
252         if (isClosing)
253             throw new javax.jms.IllegalStateException JavaDoc("closed");
254
255         return new AbstractProducer(p0, this);
256     }
257
258     // MESSAGES
259

260     public BytesMessage createBytesMessage()
261         throws JMSException
262     {
263         if (isClosing)
264             throw new javax.jms.IllegalStateException JavaDoc("closed");
265
266         return new LocalBytesMessage(factory);
267     }
268
269     public MapMessage createMapMessage()
270         throws JMSException
271     {
272         if (isClosing)
273             throw new javax.jms.IllegalStateException JavaDoc("closed");
274
275         return new LocalMapMessage(factory);
276     }
277
278     public Message createMessage()
279         throws JMSException
280     {
281         if (isClosing)
282             throw new javax.jms.IllegalStateException JavaDoc("closed");
283
284         return new LocalMessage(factory);
285     }
286
287     public ObjectMessage createObjectMessage()
288         throws JMSException
289     {
290         if (isClosing)
291             throw new javax.jms.IllegalStateException JavaDoc("closed");
292
293         return new LocalObjectMessage(factory);
294     }
295
296     public ObjectMessage createObjectMessage(java.io.Serializable JavaDoc object)
297         throws JMSException
298     {
299         if (isClosing)
300             throw new javax.jms.IllegalStateException JavaDoc("closed");
301
302         ObjectMessage om = createObjectMessage();
303         om.setObject(object);
304         return om;
305     }
306
307     public StreamMessage createStreamMessage()
308         throws JMSException
309     {
310         if (isClosing)
311             throw new javax.jms.IllegalStateException JavaDoc("closed");
312
313         return new LocalStreamMessage(factory);
314     }
315
316     public TextMessage createTextMessage()
317         throws JMSException
318     {
319         if (isClosing)
320             throw new javax.jms.IllegalStateException JavaDoc("closed");
321
322         return new LocalTextMessage(factory);
323     }
324
325     public TextMessage createTextMessage(String JavaDoc text)
326         throws JMSException
327     {
328         TextMessage tm = createTextMessage();
329         tm.setText(text);
330         return tm;
331     }
332
333     public boolean getTransacted()
334     {
335         return false;
336     }
337
338     public void commit()
339         throws JMSException
340     {
341         // TODO: implement transactional??
342
// silently ignore this, because commit's are OK.
343
}
344
345     public void rollback()
346         throws JMSException
347     {
348         // TODO: implement transactional??
349
// this won't work as anticipated, so throw unsupported operation.
350
throw new JMSUnsupportedOperationException();
351     }
352
353     boolean isClosing() {return isClosing;}
354
355     public void close() throws JMSException
356     {
357         if (!isClosing)
358         {
359             isClosing = true;
360             conn.removeSession(this);
361             stopDeliveryThread();
362         }
363     }
364
365     public void recover()
366         throws JMSException
367     {
368         // TODO: we don't currently support recovery...
369
// that is because the server forgets about messages we have
370
// not acknowledged unless you are durable.
371
// the JMS spec says that you are supposed to be able to get unack'd messages
372
// if you want...
373
throw new JMSUnsupportedOperationException();
374     }
375
376     public MessageListener getMessageListener()
377         throws JMSException
378     {
379         return listener;
380     }
381
382     public void setMessageListener(MessageListener listener)
383         throws JMSException
384     {
385         if (isClosing)
386             throw new javax.jms.IllegalStateException JavaDoc("closed");
387
388         this.listener = listener;
389         checkDeliveryThread();
390     }
391
392     public void run()
393     {
394     }
395
396     /**
397      * enqueue a Message for asynchronous delivery by the session delivery
398      * thread.
399      * @param msg the message to deliver
400      * @param listener the JMS message listener to deliver to
401      * @throws InterruptedException if the thread is interrupted while the
402      * delivery buffer is full.
403      */

404     void asyncDelivery(final Message msg,
405                        final AbstractConsumer sub,
406                        final MessageListener listener)
407         throws InterruptedException JavaDoc
408     {
409         deliveryQueue.put(new DeliveryTask(msg, sub, listener));
410     }
411
412     private class DeliveryTask
413         implements Runnable JavaDoc, Comparable JavaDoc
414     {
415         private final Message msg;
416         private final AbstractConsumer sub;
417         private final MessageListener listener;
418
419         DeliveryTask(Message msg,
420                      AbstractConsumer sub,
421                      MessageListener listener)
422         {
423             this.msg = msg;
424             this.sub = sub;
425             this.listener = listener;
426         }
427
428         public int compareTo(Object JavaDoc o)
429         {
430             try
431             {
432                 return msg.getJMSPriority() - ((DeliveryTask)o).msg.getJMSPriority();
433             }
434             catch (JMSException e) {
435                 return 0;
436             }
437         }
438
439         public void run() {
440             try {
441                 if (!sub.isClosing() &&
442                     !isClosing())
443                 {
444                     listener.onMessage(msg);
445                 }
446             } catch(RuntimeException JavaDoc re) {
447                 log.error("", re);
448                 if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
449                     // according to JMS spec, ignore
450
;
451                 } else {
452                     // for DUPS_OK and AUTO_ we try immediately
453
// one more time, and then move on.
454
try {
455                         listener.onMessage(msg);
456                     } catch(RuntimeException JavaDoc re2) {
457                         ; // move on... the client is hosed prob.
458
}
459                 }
460             }
461         }
462     }
463
464     /**
465      * starts the delivery thread
466      */

467     synchronized private void startDeliveryThread()
468     {
469         executor = new PooledExecutor(deliveryQueue);
470         executor.setThreadFactory( new ThreadFactory() {
471                     public Thread JavaDoc newThread(Runnable JavaDoc p0)
472                     {
473                         Thread JavaDoc t = new Thread JavaDoc(p0, "Session Delivery Thread");
474                         t.setDaemon(false);
475                         return t;
476                     }
477                 });
478         executor.waitWhenBlocked();
479         executor.setKeepAliveTime(-1);
480         executor.setMinimumPoolSize(1);
481         executor.setMaximumPoolSize(1);
482         executor.createThreads(1);
483     }
484
485     /**
486      * Indicates that a subscriber needs a delivery thread to
487      * be running for this session. Delivery threads are used
488      * in conjunction with asynchronous subscribers.
489      */

490     synchronized void requestDeliveryThread()
491     {
492         this.needsAsyncDelivery = true;
493     }
494
495     /**
496      * Ensures the delivery thread is running. Starts it if not,
497      * and if someone has requested a delivery thread using
498      * the <code>requestDeliveryThread</code> method.
499      */

500     synchronized void checkDeliveryThread()
501     {
502         if (executor == null &&
503             needsAsyncDelivery)
504             startDeliveryThread();
505     }
506
507     /**
508      * Stops the delivery thread.
509      */

510     synchronized void stopDeliveryThread()
511     {
512         if (executor != null) {
513             executor.shutdownNow();
514             executor = null;
515         }
516     }
517
518     /**
519      * Pauses the session from delivering messages.
520      */

521     void pause()
522     {
523         stopDeliveryThread();
524
525         Iterator iter = consumers.iterator();
526         while (iter.hasNext())
527         {
528             AbstractConsumer lts = (AbstractConsumer)iter.next();
529             lts.pause();
530         }
531     }
532
533     /**
534      * Resumes delivery of messages.
535      */

536     void resume()
537     {
538         checkDeliveryThread();
539
540         Iterator iter = consumers.iterator();
541         while (iter.hasNext())
542         {
543             AbstractConsumer lts = (AbstractConsumer)iter.next();
544             lts.resume();
545         }
546     }
547
548 }
549
550
Popular Tags