KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > jms > serverless > SessionImpl


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7 package org.jboss.jms.serverless;
8
9 import org.jboss.logging.Logger;
10 import java.io.Serializable JavaDoc;
11 import javax.jms.Session JavaDoc;
12 import javax.jms.BytesMessage JavaDoc;
13 import javax.jms.MapMessage JavaDoc;
14 import javax.jms.Message JavaDoc;
15 import javax.jms.ObjectMessage JavaDoc;
16 import javax.jms.StreamMessage JavaDoc;
17 import javax.jms.JMSException JavaDoc;
18 import javax.jms.TextMessage JavaDoc;
19 import javax.jms.MessageListener JavaDoc;
20 import javax.jms.MessageProducer JavaDoc;
21 import javax.jms.MessageConsumer JavaDoc;
22 import javax.jms.Destination JavaDoc;
23 import javax.jms.Queue JavaDoc;
24 import javax.jms.Topic JavaDoc;
25 import javax.jms.TopicSubscriber JavaDoc;
26 import javax.jms.QueueBrowser JavaDoc;
27 import javax.jms.TemporaryQueue JavaDoc;
28 import javax.jms.TemporaryTopic JavaDoc;
29 import java.util.List JavaDoc;
30 import java.util.ArrayList JavaDoc;
31 import java.util.Iterator JavaDoc;
32
33 /**
34  *
35  * @author Ovidiu Feodorov <ovidiu@jboss.org>
36  * @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $
37  *
38  **/

39 class SessionImpl implements Session JavaDoc {
40
41     private static final Logger log = Logger.getLogger(SessionImpl.class);
42
43     private SessionManager sessionManager;
44     private String JavaDoc id;
45     private List JavaDoc subscribers;
46     private List JavaDoc receivers;
47     private boolean transacted;
48     private int acknowledgeMode;
49     private int receiverCounter = 0;
50
51     /**
52      * @param id - the session id. The SessionManager instance guarantees uniqueness during its
53      * lifetime.
54      **/

55     SessionImpl(SessionManager sessionManager,
56                 String JavaDoc id,
57                 boolean transacted,
58                 int acknowledgeMode) {
59         
60         this.sessionManager = sessionManager;
61         this.id = id;
62         subscribers = new ArrayList JavaDoc();
63         receivers = new ArrayList JavaDoc();
64         this.transacted = transacted;
65         this.acknowledgeMode = acknowledgeMode;
66
67         if (transacted) {
68             throw new NotImplementedException("Transacted sessions not supported");
69         }
70     }
71
72     public String JavaDoc getID() {
73         return id;
74     }
75
76     void send(Message JavaDoc m) throws JMSException JavaDoc {
77         sessionManager.getConnection().send(m);
78     }
79
80     /**
81      * Delivery to topic subscribers.
82      **/

83     // TO_DO: acknowledgement, deal with failed deliveries
84
void deliver(Message JavaDoc m) {
85
86         // TO_DO: single threaded access for sessions
87
// So far, the only thread that accesses dispatch() is the connection's puller thread and
88
// this will be the unique thread that accesses the Sessions. This may not be sufficient
89
// for high load, consider the possiblity to (dynamically) add new threads to handle
90
// delivery, possibly a thread per session.
91

92         Destination JavaDoc destination = null;
93         try {
94             destination = m.getJMSDestination();
95         }
96         catch(JMSException JavaDoc e) {
97             // TO_DO: cannot deliver, a failure handler should take over
98
log.error("Unhandled failure", e);
99             return;
100         }
101
102         // TO_DO: properly handle the case when the destination is null
103

104         for(Iterator JavaDoc i = subscribers.iterator(); i.hasNext(); ) {
105
106             TopicSubscriberImpl sub = (TopicSubscriberImpl)i.next();
107             if (destination.equals(sub.getDestination())) {
108                 MessageListener JavaDoc l = null;
109                 try {
110                     l = sub.getMessageListener();
111                 }
112                 catch(JMSException JavaDoc e) {
113                     // TO_DO: cannot deliver, a failure handler should take over
114
log.error("Unhandled failure", e);
115                     continue;
116                 }
117                 if (l == null) {
118                     continue;
119                 }
120                 l.onMessage(m);
121             }
122         }
123     }
124
125     /**
126      * Delivery to queue receivers.
127      **/

128     // TO_DO: acknowledgement, deal with failed deliveries
129
void deliver(Message JavaDoc m, String JavaDoc receiverID) {
130
131         // TO_DO: single threaded access for sessions
132
// So far, the only thread that accesses dispatch() is the connection's puller thread and
133
// this will be the unique thread that accesses the Sessions. This may not be sufficient
134
// for high load, consider the possiblity to (dynamically) add new threads to handle
135
// delivery, possibly a thread per session.
136

137         QueueReceiverImpl receiver = null;
138         for(Iterator JavaDoc i = receivers.iterator(); i.hasNext(); ) {
139
140             QueueReceiverImpl crtRec = (QueueReceiverImpl)i.next();
141             if (crtRec.getID().equals(receiverID)) {
142                 receiver = crtRec;
143                 break;
144             }
145         }
146
147         if (receiver == null) {
148             log.error("No such receiver: "+receiverID+". Delivery failed!");
149             return;
150         }
151         MessageListener JavaDoc l = null;
152         try {
153             l = receiver.getMessageListener();
154         }
155         catch(JMSException JavaDoc e) {
156             // TO_DO: cannot deliver, a failure handler should take over
157
log.error("Unhandled failure", e);
158             return;
159         }
160         if (l == null) {
161             log.warn("No message listener for receiver "+receiverID+". Delivery failed!");
162         }
163         else {
164             l.onMessage(m);
165         }
166     }
167     
168
169     //
170
// Session INTERFACE IMPLEMEMENTATION
171
//
172

173     public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc {
174         throw new NotImplementedException();
175     }
176
177     public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc {
178         throw new NotImplementedException();
179     }
180
181     public Message JavaDoc createMessage() throws JMSException JavaDoc {
182         throw new NotImplementedException();
183     }
184
185     public ObjectMessage JavaDoc createObjectMessage() throws JMSException JavaDoc {
186         throw new NotImplementedException();
187     }
188
189     public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc object) throws JMSException JavaDoc {
190         throw new NotImplementedException();
191     }
192
193     public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc {
194         throw new NotImplementedException();
195     }
196
197     public TextMessage JavaDoc createTextMessage() throws JMSException JavaDoc {
198         return new TextMessageImpl();
199     }
200
201     public TextMessage JavaDoc createTextMessage(String JavaDoc text) throws JMSException JavaDoc {
202         throw new NotImplementedException();
203     }
204
205     public boolean getTransacted() throws JMSException JavaDoc {
206         return transacted;
207     }
208     
209     public int getAcknowledgeMode() throws JMSException JavaDoc {
210         return acknowledgeMode;
211     }
212
213     public void commit() throws JMSException JavaDoc {
214         throw new NotImplementedException();
215     }
216
217     public void rollback() throws JMSException JavaDoc {
218         throw new NotImplementedException();
219     }
220
221     public void close() throws JMSException JavaDoc {
222         throw new NotImplementedException();
223     }
224
225     public void recover() throws JMSException JavaDoc {
226         throw new NotImplementedException();
227     }
228
229     public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc {
230         throw new NotImplementedException();
231     }
232
233     public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc {
234         throw new NotImplementedException();
235     }
236
237     public void run() {
238         throw new NotImplementedException();
239     }
240     
241     public MessageProducer JavaDoc createProducer(Destination JavaDoc destination) throws JMSException JavaDoc {
242         
243         if (destination instanceof Topic JavaDoc) {
244             return new TopicPublisherImpl(this, (Topic JavaDoc)destination);
245         }
246         else if (destination instanceof Queue JavaDoc) {
247             return new QueueSenderImpl(this, (Queue JavaDoc)destination);
248         }
249         throw new JMSException JavaDoc("Destination not a Topic or Queue");
250     }
251
252     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination) throws JMSException JavaDoc {
253         
254         if (destination instanceof Topic JavaDoc) {
255             TopicSubscriberImpl ts = new TopicSubscriberImpl(this, (Topic JavaDoc)destination);
256             subscribers.add(ts);
257             return ts;
258         }
259         else if (destination instanceof Queue JavaDoc) {
260             QueueReceiverImpl qr =
261                 new QueueReceiverImpl(this, generateReceiverID(), (Queue JavaDoc)destination);
262             sessionManager.advertiseQueueReceiver(getID(), qr, true);
263             receivers.add(qr);
264             return qr;
265         }
266         throw new JMSException JavaDoc("Destination not a Topic or Queue");
267     }
268
269
270     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector)
271         throws JMSException JavaDoc {
272         throw new NotImplementedException();
273     }
274
275     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination,
276                                           String JavaDoc messageSelector,
277                                           boolean NoLocal)
278         throws JMSException JavaDoc {
279         throw new NotImplementedException();
280     }
281     
282     public Queue JavaDoc createQueue(String JavaDoc queueName) throws JMSException JavaDoc {
283         throw new NotImplementedException();
284     }
285     
286     public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc {
287         throw new NotImplementedException();
288     }
289
290     public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic,
291                                                    String JavaDoc name) throws JMSException JavaDoc {
292         throw new NotImplementedException();
293     }
294
295     public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic,
296                                                    String JavaDoc name,
297                                                    String JavaDoc messageSelector,
298                                                    boolean noLocal) throws JMSException JavaDoc {
299         throw new NotImplementedException();
300     }
301     
302     public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue) throws JMSException JavaDoc {
303         throw new NotImplementedException();
304     }
305
306     public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue,
307                                       String JavaDoc messageSelector) throws JMSException JavaDoc {
308         throw new NotImplementedException();
309     }
310
311     public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc {
312         throw new NotImplementedException();
313     }
314    
315     public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc {
316         throw new NotImplementedException();
317     }
318
319     public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc {
320         throw new NotImplementedException();
321     }
322
323     //
324
// END Session INTERFACE IMPLEMEMENTATION
325
//
326

327     /**
328      * The reverse of createConsumer().
329      **/

330     void removeConsumer(MessageConsumer JavaDoc consumer) throws JMSException JavaDoc {
331
332         if (consumer instanceof QueueReceiverImpl) {
333             if (!receivers.contains(consumer)) {
334                 throw new JMSException JavaDoc("No such QueueReceiver: "+consumer);
335             }
336             sessionManager.advertiseQueueReceiver(getID(), (QueueReceiverImpl)consumer, false);
337             receivers.remove(consumer);
338         }
339         else if (consumer instanceof TopicSubscriberImpl) {
340             throw new NotImplementedException();
341         }
342         else {
343             throw new JMSException JavaDoc("MessageConsumer not a TopicSubscriber or a QueueReceiver");
344         }
345     }
346
347     
348     /**
349      * Generate a queue receiver ID that is quaranteed to be unique for the life time of this
350      * Session instance.
351      **/

352     private synchronized String JavaDoc generateReceiverID() {
353         return Integer.toString(receiverCounter++);
354     }
355
356     //
357
// LIST MANAGEMENT METHODS
358
//
359

360     //private QueueReceiverImpl getReceiver(String receiverID)
361

362     //
363
// END OF LIST MANAGEMENT METHODS
364
//
365

366 }
367
Popular Tags