KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > Jt > jms > JtJMSTopicAdapter


1
2
3 package Jt.jms;
4
5 import Jt.*;
6 import java.io.*;
7 import Jt.jndi.*;
8 import javax.jms.*;
9 import javax.naming.*;
10
11 /**
12  * Jt Adapter for the JMS publish/subscribe API.
13  */

14
15 public class JtJMSTopicAdapter extends JtAdapter implements MessageListener {
16
17   private String JavaDoc topic;
18   private String JavaDoc connectionFactory;
19   private long timeout = 1L; // Receives the next message within the timeout interval
20
private Object JavaDoc subject = null;
21   private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
22   private int priority = Message.DEFAULT_PRIORITY;
23   private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; // message never expires
24

25   private transient JtJNDIAdapter jndiAdapter = null;
26   private transient boolean initted = false;
27
28   private transient Topic jmsTopic;
29   private transient TopicConnectionFactory tcFactory;
30   private transient TopicConnection topicConnection;
31   private transient TopicSession topicSession;
32   private transient TopicPublisher topicPublisher;
33   private transient TopicSubscriber topicSubscriber;
34
35   // Intialize the JMS Adapter
36

37   private void initial () {
38     JtMessage msg = new JtMessage ("JtLOOKUP");
39
40         
41     jndiAdapter = new JtJNDIAdapter ();
42
43     if (connectionFactory == null) {
44       handleError ("Attribute value needs to be set (connectionFactory)");
45       return;
46     }
47
48     msg.setMsgContent ("TestJMSConnectionFactory");
49  
50     tcFactory = (TopicConnectionFactory) sendMessage (jndiAdapter, msg);
51
52     if (tcFactory == null)
53       return;
54
55     if (topic == null) {
56       handleError ("Attribute value needs to be set (topic)");
57       return;
58     }
59     msg.setMsgContent (topic);
60
61     jmsTopic = (Topic) sendMessage (jndiAdapter, msg);
62
63
64     if (jmsTopic == null)
65       return;
66
67     try {
68       topicConnection = tcFactory.createTopicConnection ();
69       topicSession = topicConnection.createTopicSession (false,
70                                  Session.AUTO_ACKNOWLEDGE);
71
72     } catch (Exception JavaDoc e) {
73       handleException (e);
74     }
75
76   }
77
78
79   /**
80     * Method used by this adapter to consume JMS messages
81     * asynchronously.
82     * @param message JMS message
83     */

84
85   public void onMessage (Message message) {
86     JtMessage msg;
87     ObjectMessage omessage;
88
89     if (message == null)
90       return;
91
92     
93     try {
94
95       omessage = (ObjectMessage) message;
96       msg = (JtMessage) omessage.getObject ();
97
98       if (subject == null) {
99         handleWarning ("JtJMSAdapter.onMessage: the subject attribute needs to be set");
100         return;
101       }
102
103       sendMessage (subject, msg);
104
105     } catch (Exception JavaDoc ex) {
106       handleException (ex);
107     }
108   }
109   
110   /**
111     * Process object messages.
112     * <ul>
113     * <li> JtPUBLISH - Publish a JtMessage (msgContent).
114     * <li> JtRECEIVE - Receive a JtMessage from the JMS queue and return it.
115     * <li> The message is consumed synchronously.
116     * <li> JtSTART_LISTENING - Start listening and consume messages asynchronously.
117     * </ul>
118     */

119
120   public Object JavaDoc processMessage (Object JavaDoc message) {
121   String JavaDoc content;
122   String JavaDoc query;
123   JtMessage e = (JtMessage) message;
124   Object JavaDoc reply;
125   JtMessage msg;
126
127
128       if (e == null || (e.getMsgId() == null))
129           return (null);
130
131
132       if (e.getMsgId().equals ("JtREMOVE")) {
133     return (null);
134       }
135
136       if (!initted) {
137         initial ();
138         initted = true;
139       }
140
141
142       if (e.getMsgId().equals("JtPUBLISH")) {
143         msg = (JtMessage) e.getMsgContent ();
144         reply = publishJMSMessage (msg);
145         return (reply);
146       }
147
148
149       if (e.getMsgId().equals("JtSTART_LISTENING")) {
150         startListening ();
151         return (null);
152       }
153
154       if (e.getMsgId().equals("JtRECEIVE")) {
155         reply = receiveJMSMessage ();
156         return (reply);
157       }
158
159       // Test the Publisher functionality
160

161       if (e.getMsgId().equals("JtTEST_PUBLISHER")) {
162     reply = testPublisher ();
163     return (reply);
164       }
165
166       // Test the Subscriber functionality
167

168       if (e.getMsgId().equals("JtTEST_SUBSCRIBER")) {
169     reply = testSubscriber ();
170     return (reply);
171       }
172
173
174       handleError
175     ("processMessage: invalid message id:"+
176         e.getMsgId());
177       return (null);
178   }
179
180
181
182   /**
183    * Specifies the JNDI name of the JMS topic.
184    * @param topic topic
185    */

186
187   public void setTopic (String JavaDoc topic) {
188     this.topic = topic;
189   }
190
191
192   /**
193    * Returns the JNDI name of the JMS topic.
194    */

195
196   public String JavaDoc getTopic () {
197     return (topic);
198   }
199
200   /**
201    * Specifies the timeout interval (refer to javax.jms.MessageConsumer)
202    * @param timeout timeout
203    */

204
205   public void setTimeout (long timeout) {
206     this.timeout = timeout;
207   }
208
209
210   /**
211    * Returns timeout (refer to javax.jms.MessageConsumer)
212    */

213
214   public long getTimeout () {
215     return (timeout);
216   }
217
218
219   /**
220    * Sets the delivery mode (persistent or non-persistent).
221    * Messages will be published using this delivery mode.
222    * @param deliveryMode delivery mode
223    */

224
225   public void setDeliveryMode (int deliveryMode) {
226     this.deliveryMode = deliveryMode;
227   }
228
229
230   /**
231    * Returns the delivery mode (persistent or non-persistent)
232    */

233
234   public long getDeliveryMode () {
235     return (deliveryMode);
236   }
237
238
239
240
241   /**
242    * Sets the message priority. Messages will be published
243    * using this priority.
244    * @param priority message priority
245    */

246
247   public void setPriority (int priority) {
248     this.priority = priority;
249   }
250
251
252   /**
253    * Returns the message priority.
254    */

255
256   public long getPriority () {
257     return (priority);
258   }
259
260
261   /**
262    * Sets the message time to live (in milliseconds). Messages will be published
263    * using this value.
264    * @param timeToLive message time to live
265    */

266
267   public void setTimeToLive (long timeToLive) {
268     this.timeToLive = timeToLive;
269   }
270
271
272   /**
273    * Returns the message time to live (in milliseconds).
274    */

275
276   public long getTimeToLive () {
277     return (timeToLive);
278   }
279
280   /**
281    * Specifies the subject (JtObject). Messages received asynchronously are forwarded to
282    * this Jt object for processing.
283    * @param subject subject
284    */

285
286
287   public void setSubject (Object JavaDoc subject) {
288     this.subject = subject;
289   }
290
291
292   /**
293    * Returns the subject. Messages received asynchronously are forwarded to
294    * this Jt object for processing.
295    */

296
297   public Object JavaDoc getSubject () {
298     return (subject);
299   }
300
301
302   /**
303    * Specifies the JNDI name of the connection factory.
304    * @param connectionFactory connection factory
305    */

306
307   public void setConnectionFactory (String JavaDoc connectionFactory) {
308     this.connectionFactory = connectionFactory;
309   }
310
311
312   /**
313    * Returns the JNDI name of the connection factory.
314    */

315
316   public String JavaDoc getConnectionFactory () {
317     return (connectionFactory);
318   }
319
320
321
322
323   private Object JavaDoc testSubscriber () {
324     String JavaDoc reply = "PASS";
325     //TextMessage message;
326
ObjectMessage message;
327     JtMessage msg;
328
329  
330
331     for (;;) {
332
333       msg = (JtMessage) sendMessage (this, new JtMessage ("JtRECEIVE"));
334
335       if (msg == null) {
336         System.out.println ("no more messages");
337         break;
338       }
339
340       System.out.println ("msgId:" + msg.getMsgId ());
341
342     }
343
344     return (reply);
345   }
346
347
348   // Send a Jt message using JMS as the transport layer
349

350   private Object JavaDoc publishJMSMessage (JtMessage msg) {
351
352     ObjectMessage omsg;
353     String JavaDoc reply = "PASS";
354
355
356     if (msg == null) {
357       reply = "FAIL";
358       return (reply);
359     }
360
361     try {
362
363       if (topicPublisher == null)
364         topicPublisher = topicSession.createPublisher (jmsTopic);
365
366       omsg = topicSession.createObjectMessage ();
367       omsg.setObject (msg);
368
369
370       topicPublisher.publish (omsg, deliveryMode, priority, timeToLive);
371     } catch (Exception JavaDoc e) {
372       handleException (e);
373       reply = "FAIL";
374     }
375     return (reply);
376   }
377
378   private void startListening () {
379
380
381     try {
382       if (topicSubscriber == null)
383         topicSubscriber = topicSession.createSubscriber (jmsTopic);
384
385
386       if (topicConnection == null) {
387         handleError ("receiveJMSMessage:topicConnection is null");
388         return;
389       }
390
391       // Use the adapter as the message listener
392

393       topicSubscriber.setMessageListener (this);
394
395       topicConnection.start ();
396     } catch (Exception JavaDoc ex) {
397
398       handleException (ex);
399     }
400
401   }
402   
403   private JtMessage receiveJMSMessage () {
404
405     JtMessage msg = null;
406     ObjectMessage message;
407
408
409     try {
410
411       if (topicSubscriber == null)
412         topicSubscriber = topicSession.createSubscriber (jmsTopic);
413
414
415       if (topicConnection == null) {
416         handleError ("receiveJMSMessage:topicConnection is null");
417         return (null);
418       }
419       topicConnection.start ();
420
421       message = (ObjectMessage) topicSubscriber.receive (timeout);
422       if (message != null) {
423         msg = (JtMessage) message.getObject ();
424       }
425
426     } catch (Exception JavaDoc e) {
427       handleException (e);
428     }
429
430     return (msg);
431
432   }
433
434
435
436   private Object JavaDoc testPublisher () {
437     String JavaDoc reply = "PASS";
438     TextMessage message;
439     ObjectMessage omsg;
440     JtMessage msg = new JtMessage ("JtHELLO");
441     JtMessage wrapper = new JtMessage ("JtPUBLISH");
442
443     wrapper.setMsgContent (msg);
444
445
446     return (sendMessage (this, wrapper));
447   }
448
449   /**
450    * Unit tests the messages processed by JtJMSTopicAdapter.
451    */

452
453   public static void main (String JavaDoc[] args) {
454     JtFactory main;
455     JtJMSTopicAdapter jmsAdapter;
456
457     main = new JtFactory ();
458
459
460     jmsAdapter = (JtJMSTopicAdapter) main.createObject ("Jt.jms.JtJMSTopicAdapter", "jmsAdapter");
461
462     if (args.length < 1) {
463     System.err.println ("Usage: java Jt.jms.JtJMSTopicAdapter -p or java Jt.jms.JtJMSTopicAdapter -s");
464     System.exit (1);
465     }
466
467     if (args[0].equals ("-p")) {
468       main.sendMessage (jmsAdapter, new JtMessage ("JtTEST_PUBLISHER"));
469       System.exit (0);
470     } else if (args[0].equals ("-s")) {
471       main.sendMessage (jmsAdapter, new JtMessage ("JtTEST_SUBSCRIBER"));
472       System.exit (0);
473     } else
474       System.err.println ("Usage: java Jt.jms.JtJMSTopicAdapter -p or java Jt.jms.JtJMSTopicAdapter -s");
475
476     main.removeObject (jmsAdapter);
477
478   }
479 }
Popular Tags