KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > jms > client > p2p > P2PSessionDelegate


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.jms.client.p2p;
8
9 import java.io.Serializable JavaDoc;
10 import java.util.ArrayList JavaDoc;
11 import java.util.Collection JavaDoc;
12 import java.util.Iterator JavaDoc;
13 import java.util.List JavaDoc;
14 import java.util.Map JavaDoc;
15 import java.util.TreeMap JavaDoc;
16
17 import javax.jms.BytesMessage JavaDoc;
18 import javax.jms.Destination JavaDoc;
19 import javax.jms.JMSException JavaDoc;
20 import javax.jms.MapMessage JavaDoc;
21 import javax.jms.Message JavaDoc;
22 import javax.jms.MessageListener JavaDoc;
23 import javax.jms.ObjectMessage JavaDoc;
24 import javax.jms.Queue JavaDoc;
25 import javax.jms.Session JavaDoc;
26 import javax.jms.StreamMessage JavaDoc;
27 import javax.jms.TextMessage JavaDoc;
28 import javax.transaction.xa.XAResource JavaDoc;
29
30 import org.jboss.jms.BytesMessageImpl;
31 import org.jboss.jms.MapMessageImpl;
32 import org.jboss.jms.MessageImpl;
33 import org.jboss.jms.ObjectMessageImpl;
34 import org.jboss.jms.StreamMessageImpl;
35 import org.jboss.jms.TextMessageImpl;
36 import org.jboss.jms.client.BrowserDelegate;
37 import org.jboss.jms.client.ConsumerDelegate;
38 import org.jboss.jms.client.ProducerDelegate;
39 import org.jboss.jms.client.SessionDelegate;
40
41 /**
42  * The p2p session
43  *
44  * @author <a HREF="mailto:nathan@jboss.org">Nathan Phelps</a>
45  * @author <a HREF="mailto:adrian@jboss.org>Adrian Brock</a>
46  * @version $Revision: 1.3 $
47  */

48 public class P2PSessionDelegate
49    implements SessionDelegate
50 {
51    // Constants -----------------------------------------------------
52

53    // Attributes ----------------------------------------------------
54

55    private P2PConnectionDelegate connection = null;
56    private int acknowledgeMode;
57    private boolean closed = false; // TOD: make sure this is the default.
58
private MessageListener JavaDoc messageListener = null;
59    private boolean transacted = false;
60    // TOD: Might be able to eliminate the seperate lists by implementing a
61
//interface which just does a close() if that is all we're uisng this for...
62
private List JavaDoc messageConsumers = new ArrayList JavaDoc();
63    private List JavaDoc messageProducers = new ArrayList JavaDoc();
64    private List JavaDoc queueBrowsers = new ArrayList JavaDoc();
65
66    private Map JavaDoc unacknowledgedMessageMap = new TreeMap JavaDoc();
67    private long nextDeliveryId = 0;
68    private boolean recovering = false;
69    private Object JavaDoc recoveryLock = new Object JavaDoc();
70    private List JavaDoc uncommittedMessages = new ArrayList JavaDoc();
71
72    // Static --------------------------------------------------------
73

74    // Constructors --------------------------------------------------
75

76    public P2PSessionDelegate(P2PConnectionDelegate connection, boolean transaction, int acknowledgeMode)
77       throws JMSException JavaDoc
78    {
79       this.connection = connection;
80       this.transacted = transaction;
81       this.acknowledgeMode = acknowledgeMode;
82    }
83
84    // Public --------------------------------------------------------
85

86    // SessionDelegate implementation -----------------------------
87

88    public void close() throws JMSException JavaDoc
89    {
90       if (!this.closed)
91       {
92           if (this.transacted)
93           {
94               this.rollback();
95           }
96           Iterator JavaDoc iterator = this.messageConsumers.iterator();
97           while (iterator.hasNext())
98           {
99               ((ConsumerDelegate) iterator.next()).close();
100               iterator.remove();
101           }
102           iterator = this.messageProducers.iterator();
103           while (iterator.hasNext())
104           {
105               ((ProducerDelegate) iterator.next()).close();
106           }
107           iterator = this.queueBrowsers.iterator();
108           while (iterator.hasNext())
109           {
110               ((BrowserDelegate) iterator.next()).close();
111           }
112           this.closed = true;
113       }
114    }
115
116    public void closing() throws JMSException JavaDoc
117    {
118    }
119
120    public void commit() throws JMSException JavaDoc
121    {
122       this.throwExceptionIfClosed();
123       if (this.transacted)
124       {
125           this.recovering = true;
126           if (this.uncommittedMessages.size() > 0)
127           {
128               this.connection.send((Collection JavaDoc) ((ArrayList JavaDoc) this.uncommittedMessages).clone());
129           }
130           this.unacknowledgedMessageMap.clear();
131           this.uncommittedMessages.clear();
132           this.recovering = false;
133           synchronized (this.recoveryLock)
134           {
135               this.recoveryLock.notify();
136           }
137       }
138       else
139       {
140           throw new IllegalStateException JavaDoc("Illegal Operation: This is not a transacted Session.");
141       }
142    }
143
144    public BrowserDelegate createBrowser(Queue JavaDoc queue, String JavaDoc selector) throws JMSException JavaDoc
145    {
146       this.throwExceptionIfClosed();
147       return new P2PBrowserDelegate(this, queue, selector);
148    }
149
150    public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc
151    {
152       this.throwExceptionIfClosed();
153       return new BytesMessageImpl();
154    }
155
156    public ConsumerDelegate createConsumer(
157       Destination JavaDoc destination,
158       String JavaDoc subscription,
159       String JavaDoc selector,
160       boolean noLocal)
161       throws JMSException JavaDoc
162    {
163       this.throwExceptionIfClosed();
164       ConsumerDelegate messageConsumer = new P2PConsumerDelegate(this, destination, selector, noLocal);
165       this.messageConsumers.add(messageConsumer);
166       return messageConsumer;
167    }
168
169    public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc
170    {
171       this.throwExceptionIfClosed();
172       return new MapMessageImpl();
173    }
174
175    public javax.jms.Message JavaDoc createMessage() throws JMSException JavaDoc
176    {
177       this.throwExceptionIfClosed();
178       return new MessageImpl();
179    }
180
181    public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc object) throws JMSException JavaDoc
182    {
183       this.throwExceptionIfClosed();
184       return new ObjectMessageImpl(object);
185    }
186
187    public ProducerDelegate createProducer(Destination JavaDoc destination) throws JMSException JavaDoc
188    {
189       this.throwExceptionIfClosed();
190       ProducerDelegate messageProducer = new P2PProducerDelegate(this, destination);
191       this.messageProducers.add(messageProducer);
192       return messageProducer;
193    }
194
195    public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc
196    {
197       this.throwExceptionIfClosed();
198       return new StreamMessageImpl();
199    }
200
201    public Destination JavaDoc createTempDestination(int type) throws JMSException JavaDoc
202    {
203       // TODO createTempDestination
204
return null;
205    }
206
207    public TextMessage JavaDoc createTextMessage(String JavaDoc text) throws JMSException JavaDoc
208    {
209       this.throwExceptionIfClosed();
210       return new TextMessageImpl(text);
211    }
212
213    public Destination JavaDoc getDestination(String JavaDoc name) throws JMSException JavaDoc
214    {
215       // TODO getDestination
216
return null;
217    }
218
219    public XAResource JavaDoc getXAResource()
220    {
221       // TODO getXAResource
222
return null;
223    }
224
225    public void recover() throws JMSException JavaDoc
226    {
227       this.throwExceptionIfClosed();
228       if (this.transacted)
229       {
230           throw new IllegalStateException JavaDoc("Illegal Operation: This is a transacted Session. Use rollback instead.");
231       }
232       synchronized (this.unacknowledgedMessageMap)
233       {
234           this.recovering = true;
235           Map JavaDoc clone = (Map JavaDoc) ((TreeMap JavaDoc) this.unacknowledgedMessageMap).clone();
236           this.unacknowledgedMessageMap.clear();
237           this.restart(clone);
238       }
239    }
240
241    public void rollback() throws JMSException JavaDoc
242    {
243       this.throwExceptionIfClosed();
244       if (this.transacted)
245       {
246           synchronized (this.unacknowledgedMessageMap)
247           {
248               this.recovering = true;
249               Map JavaDoc clone = (Map JavaDoc) ((TreeMap JavaDoc) this.unacknowledgedMessageMap).clone();
250               this.unacknowledgedMessageMap.clear();
251               this.restart(clone);
252           }
253           this.uncommittedMessages.clear();
254       }
255       else
256       {
257           throw new IllegalStateException JavaDoc("Illegal Operation: This is not a transacted Session.");
258       }
259    }
260
261    public void run()
262    {
263       // TODO run
264
}
265
266    public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc
267    {
268       this.throwExceptionIfClosed();
269       this.messageListener = listener;
270    }
271
272    public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc
273    {
274       this.throwExceptionIfClosed();
275    }
276
277    // Protected ------------------------------------------------------
278

279    // Package Private ------------------------------------------------
280

281    synchronized void send(MessageImpl message) throws JMSException JavaDoc
282    {
283        if (this.transacted)
284        {
285            this.uncommittedMessages.add(message.clone());
286        }
287        else
288        {
289            this.connection.send(message);
290        }
291    }
292
293    public void acknowledge(Message JavaDoc message, boolean acknowledge)
294    {
295        if (!this.transacted)
296        {
297            synchronized (this.unacknowledgedMessageMap)
298            {
299                Iterator JavaDoc iterator = this.unacknowledgedMessageMap.keySet().iterator();
300                while (iterator.hasNext())
301                {
302                    Long JavaDoc currentKey = (Long JavaDoc) iterator.next();
303                    if (currentKey.longValue() <= ((MessageImpl) message).deliveryId)
304                    {
305                        iterator.remove();
306                    }
307                }
308            }
309        }
310    }
311    void deliver(MessageImpl message)
312    {
313        this.deliver(message, false);
314    }
315
316    private void deliver(MessageImpl message, boolean recoveryOperation)
317    {
318        if (this.recovering && !recoveryOperation)
319        {
320            synchronized (this.recoveryLock)
321            {
322                try
323                {
324                    this.recoveryLock.wait();
325                }
326                catch (InterruptedException JavaDoc e)
327                {
328                }
329            }
330        }
331        message.setSession(this);
332        message.setDeliveryId(++this.nextDeliveryId);
333        Iterator JavaDoc iterator = this.messageConsumers.iterator();
334        if (this.acknowledgeMode != Session.AUTO_ACKNOWLEDGE)
335        {
336            synchronized (unacknowledgedMessageMap)
337            {
338                this.unacknowledgedMessageMap.put(new Long JavaDoc(this.nextDeliveryId), message);
339            }
340        }
341        while (iterator.hasNext())
342        {
343            ((P2PConsumerDelegate) iterator.next()).deliver(message);
344        }
345    }
346    // Private --------------------------------------------------------
347

348    private void throwExceptionIfClosed() throws IllegalStateException JavaDoc
349    {
350        if (this.closed)
351        {
352            throw new IllegalStateException JavaDoc("The session is closed.");
353        }
354    }
355
356    private void restart(final Map JavaDoc unacknowledgedMessage)
357    {
358        Thread JavaDoc thread = new Thread JavaDoc(new Runnable JavaDoc()
359        {
360            public void run()
361            {
362                Iterator JavaDoc iterator = unacknowledgedMessage.keySet().iterator();
363                while (iterator.hasNext())
364                {
365                    MessageImpl message = (MessageImpl) unacknowledgedMessage.get(iterator.next());
366                    message.setJMSRedelivered(true);
367                    deliver(message, true);
368                }
369                recovering = false;
370                synchronized (recoveryLock)
371                {
372                    recoveryLock.notify();
373                }
374            }
375        });
376        thread.start();
377    }
378
379    // Inner Classes --------------------------------------------------
380

381 }
382
Popular Tags