KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jfox > jms > JMSSession


1 /* JFox, the OpenSource J2EE Application Server
2  *
3  * Distributable under GNU LGPL license by gun.org
4  * more details please visit http://www.huihoo.org/jfox
5  */

6
7 package org.jfox.jms;
8
9 import java.io.Serializable JavaDoc;
10 import java.util.HashMap JavaDoc;
11 import java.util.Iterator JavaDoc;
12 import java.util.Map JavaDoc;
13 import javax.jms.BytesMessage JavaDoc;
14 import javax.jms.Destination JavaDoc;
15 import javax.jms.IllegalStateException JavaDoc;
16 import javax.jms.InvalidDestinationException JavaDoc;
17 import javax.jms.JMSException JavaDoc;
18 import javax.jms.MapMessage JavaDoc;
19 import javax.jms.Message JavaDoc;
20 import javax.jms.MessageConsumer JavaDoc;
21 import javax.jms.MessageListener JavaDoc;
22 import javax.jms.MessageProducer JavaDoc;
23 import javax.jms.ObjectMessage JavaDoc;
24 import javax.jms.Queue JavaDoc;
25 import javax.jms.QueueBrowser JavaDoc;
26 import javax.jms.QueueReceiver JavaDoc;
27 import javax.jms.QueueSender JavaDoc;
28 import javax.jms.QueueSession JavaDoc;
29 import javax.jms.Session JavaDoc;
30 import javax.jms.StreamMessage JavaDoc;
31 import javax.jms.TemporaryQueue JavaDoc;
32 import javax.jms.TemporaryTopic JavaDoc;
33 import javax.jms.TextMessage JavaDoc;
34 import javax.jms.Topic JavaDoc;
35 import javax.jms.TopicPublisher JavaDoc;
36 import javax.jms.TopicSession JavaDoc;
37 import javax.jms.TopicSubscriber JavaDoc;
38 import javax.jms.XAQueueSession JavaDoc;
39 import javax.jms.XASession JavaDoc;
40 import javax.jms.XATopicSession JavaDoc;
41 import javax.transaction.xa.XAResource JavaDoc;
42
43 import org.jfox.ioc.util.UUID;
44 import org.jfox.jms.message.BytesMessageImpl;
45 import org.jfox.jms.message.JMSMessage;
46 import org.jfox.jms.message.MapMessageImpl;
47 import org.jfox.jms.message.ObjectMessageImpl;
48 import org.jfox.jms.message.StreamMessageImpl;
49 import org.jfox.jms.message.TextMessageImpl;
50
51 /**
52  * @author <a HREF="mailto:young_yy@hotmail.com">Young Yang</a>
53  */

54
55 public class JMSSession implements Session JavaDoc,
56         QueueSession JavaDoc,
57         TopicSession JavaDoc,
58         XASession JavaDoc,
59         XAQueueSession JavaDoc,
60         XATopicSession JavaDoc {
61     private JMSConnection conn;
62
63     private boolean transacted;
64
65     private int acknowledgeMode;
66
67     private boolean isXA;
68
69     private boolean closed = false;
70
71     private MessageListener JavaDoc listener;
72
73     private Map JavaDoc<String JavaDoc, JMSConsumer> consumers = new HashMap JavaDoc<String JavaDoc, JMSConsumer>();
74
75     private String JavaDoc sessionId = UUID.randomUUID().toString();
76
77     private Map JavaDoc<String JavaDoc, JMSMessage> asyncMessages = new HashMap JavaDoc<String JavaDoc, JMSMessage>();
78
79     public JMSSession(JMSConnection conn, boolean transacted, int acknowledgeMode, boolean isXA) {
80         this.conn = conn;
81         this.transacted = transacted;
82         this.acknowledgeMode = acknowledgeMode;
83         this.isXA = isXA;
84         start();
85     }
86
87     public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc {
88         checkClosed();
89         return new BytesMessageImpl();
90     }
91
92     public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc {
93         checkClosed();
94         return new MapMessageImpl();
95     }
96
97     public Message JavaDoc createMessage() throws JMSException JavaDoc {
98         checkClosed();
99         return new JMSMessage();
100     }
101
102     public ObjectMessage JavaDoc createObjectMessage() throws JMSException JavaDoc {
103         checkClosed();
104         return new ObjectMessageImpl();
105     }
106
107     public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc object) throws JMSException JavaDoc {
108         checkClosed();
109         ObjectMessageImpl om = new ObjectMessageImpl();
110         om.setObject(object);
111         return om;
112     }
113
114     public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc {
115         checkClosed();
116         return new StreamMessageImpl();
117     }
118
119     public TextMessage JavaDoc createTextMessage() throws JMSException JavaDoc {
120         checkClosed();
121         return new TextMessageImpl();
122     }
123
124     public TextMessage JavaDoc createTextMessage(String JavaDoc text) throws JMSException JavaDoc {
125         checkClosed();
126         TextMessageImpl tm = new TextMessageImpl();
127         tm.setText(text);
128         return tm;
129     }
130
131     public boolean getTransacted() throws JMSException JavaDoc {
132         return transacted;
133     }
134
135     public int getAcknowledgeMode() throws JMSException JavaDoc {
136         return acknowledgeMode;
137     }
138
139     public synchronized void commit() throws JMSException JavaDoc {
140         checkClosed();
141         throw new JMSException JavaDoc("not support now!");
142     }
143
144     public synchronized void rollback() throws JMSException JavaDoc {
145         checkClosed();
146         throw new JMSException JavaDoc("not support now!");
147     }
148
149     public synchronized void close() throws JMSException JavaDoc {
150         if (closed) return;
151         this.closed = true;
152         conn.closeSession(sessionId);
153         synchronized (this) {
154             notifyAll();
155         }
156     }
157
158     public synchronized void recover() throws JMSException JavaDoc {
159         throw new JMSException JavaDoc("not support now!");
160     }
161
162     public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc {
163         return listener;
164     }
165
166     public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc {
167         checkClosed();
168         this.listener = listener;
169         start();
170     }
171
172     public MessageProducer JavaDoc createProducer(Destination JavaDoc destination) throws JMSException JavaDoc {
173         if (destination == null) {
174             throw new InvalidDestinationException JavaDoc("destination is null");
175         }
176         JMSProducer producer = new JMSProducer(this, destination);
177         return producer;
178     }
179
180     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination) throws JMSException JavaDoc {
181         return createConsumer(destination, null);
182     }
183
184     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector) throws JMSException JavaDoc {
185         return createConsumer(destination, messageSelector, false);
186     }
187
188     public synchronized MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination,
189                                                        String JavaDoc messageSelector,
190                                                        boolean NoLocal) throws JMSException JavaDoc {
191         if (destination == null) {
192             throw new InvalidDestinationException JavaDoc("destination is null");
193         }
194         JMSConsumer consumer = new JMSConsumer(this, destination, messageSelector, NoLocal);
195         //在 JMSContainer 中注册consumer
196
getJMSConnection().getContainer().registerConsumer(getJMSConnection().getClientID(), getSessionId(), consumer.getConsumerId(), consumer.getDestination());
197         consumers.put(consumer.getConsumerId(), consumer);
198         return consumer;
199     }
200
201     public Queue JavaDoc createQueue(String JavaDoc queueName) throws JMSException JavaDoc {
202         throw new JMSException JavaDoc("not support now!");
203     }
204
205     public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc {
206         throw new JMSException JavaDoc("not support now!");
207     }
208
209     public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name) throws JMSException JavaDoc {
210         throw new JMSException JavaDoc("not support now!");
211     }
212
213     public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic,
214                                                    String JavaDoc name,
215                                                    String JavaDoc messageSelector,
216                                                    boolean noLocal) throws JMSException JavaDoc {
217         throw new JMSException JavaDoc("not support now!");
218     }
219
220     public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue) throws JMSException JavaDoc {
221         throw new JMSException JavaDoc("not support now!");
222     }
223
224     public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc {
225         throw new JMSException JavaDoc("not support now!");
226     }
227
228     public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc {
229         throw new JMSException JavaDoc("not support now!");
230     }
231
232     public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc {
233         throw new JMSException JavaDoc("not support now!");
234     }
235
236     public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc {
237         throw new JMSException JavaDoc("not support now!");
238     }
239
240     public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue) throws JMSException JavaDoc {
241         return createReceiver(queue, null);
242     }
243
244     public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc {
245         return (QueueReceiver JavaDoc) createConsumer(queue, messageSelector);
246     }
247
248     public QueueSender JavaDoc createSender(Queue JavaDoc queue) throws JMSException JavaDoc {
249         throw new JMSException JavaDoc("not support now!");
250     }
251
252     public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic) throws JMSException JavaDoc {
253         return createSubscriber(topic, null, false);
254     }
255
256     public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic,
257                                             String JavaDoc messageSelector,
258                                             boolean noLocal) throws JMSException JavaDoc {
259         return (TopicSubscriber JavaDoc) createConsumer(topic, messageSelector, noLocal);
260     }
261
262
263     public TopicPublisher JavaDoc createPublisher(Topic JavaDoc topic) throws JMSException JavaDoc {
264         return (TopicPublisher JavaDoc) createProducer(topic);
265     }
266
267     public Session JavaDoc getSession() throws JMSException JavaDoc {
268         return this;
269     }
270
271     public XAResource JavaDoc getXAResource() {
272         if (isXA == false) {
273             throw new java.lang.IllegalStateException JavaDoc("current session " + this + " is not an XASession");
274         }
275         //TODO: getXAResource
276
return null;
277     }
278
279     public QueueSession JavaDoc getQueueSession() throws JMSException JavaDoc {
280         return (QueueSession JavaDoc) getSession();
281     }
282
283     public TopicSession JavaDoc getTopicSession() throws JMSException JavaDoc {
284         return (TopicSession JavaDoc) getSession();
285     }
286
287     /**
288      * 开始一个线程,异步接收消息
289      */

290     public void run() {
291         while (!closed) {
292             try {
293                 synchronized (this) {
294                     if (asyncMessages.isEmpty()) {
295                         wait();
296                     }
297                     if (closed) break;
298                 }
299                 for (Iterator JavaDoc it = asyncMessages.entrySet().iterator(); it.hasNext();) {
300                     Map.Entry JavaDoc<String JavaDoc, JMSMessage> entry = (Map.Entry JavaDoc<String JavaDoc, JMSMessage>) it.next();
301                     String JavaDoc consumerId = entry.getKey();
302                     JMSMessage message = entry.getValue();
303                     JMSConsumer consumer = consumers.get(consumerId);
304                     message.setSession(this);
305                     message.setConsumer(consumer);
306                     consumer.getMessageListener().onMessage(message);
307                     it.remove();
308                     if (this.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE) {
309                         acknowledge(consumer, message);
310                     }
311                 }
312             } catch (Exception JavaDoc e) {
313                 e.printStackTrace();
314             }
315         }
316     }
317
318     private void checkClosed() throws javax.jms.IllegalStateException JavaDoc {
319         if (closed) {
320             throw new javax.jms.IllegalStateException JavaDoc("connection closed");
321         }
322     }
323
324     /**
325      * get session id
326      *
327      * @return String
328      */

329     protected String JavaDoc getSessionId() {
330         return sessionId;
331     }
332
333     JMSConnection getJMSConnection() {
334         return conn;
335     }
336
337     protected void start() {
338         new Thread JavaDoc(this, "JMSSession-" + sessionId).start();
339     }
340
341     void sendMessage(Message JavaDoc message) throws JMSException JavaDoc {
342         getJMSConnection().getContainer().sendMessage((JMSMessage) message);
343     }
344
345     /**
346      * @param timeout 0 forever; -1 noWait; >1 timeToWait
347      * @return
348      */

349     JMSMessage receiveMessage(JMSConsumer consumer, long timeout) throws JMSException JavaDoc {
350         if (!getJMSConnection().isStarted()) {
351             throw new IllegalStateException JavaDoc("connection " + getJMSConnection().getClientID() + " not started, can't receive message.");
352         }
353         JMSMessage message = getJMSConnection().getContainer().receiveMessage(getJMSConnection().getClientID(),
354                 getSessionId(),
355                 consumer.getConsumerId(),
356                 timeout);
357         // acknowledge message
358
if (getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE) {
359             getJMSConnection().getContainer().acknowledge(getJMSConnection().getClientID(),
360                     getSessionId(),
361                     consumer.getConsumerId(),
362                     message.getJMSMessageID());
363         }
364         return message;
365     }
366
367     protected synchronized void setConsumerAsync(JMSConsumer consumer, boolean async) throws JMSException JavaDoc {
368         getJMSConnection().getContainer().setConsumerAsync(getJMSConnection().getClientID(),
369                 getSessionId(),
370                 consumer.getConsumerId(),
371                 async);
372     }
373
374     protected void onMessage(String JavaDoc consumerId, JMSMessage message) {
375         synchronized (this) {
376             asyncMessages.put(consumerId, message);
377             this.notifyAll();
378         }
379     }
380
381     public void acknowledge(JMSConsumer consumer, JMSMessage message) throws JMSException JavaDoc {
382         getJMSConnection().getContainer().acknowledge(getJMSConnection().getClientID(), sessionId, consumer.getConsumerId(), message.getJMSMessageID());
383     }
384
385     void closeConsumer(String JavaDoc consumerId) {
386         consumers.remove(consumerId);
387     }
388
389     public static void main(String JavaDoc[] args) {
390
391     }
392 }
393
Popular Tags