KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > Jt > jms > JtJMSQueueAdapter


1
2
3 package Jt.jms;
4
5 import Jt.*;
6 import java.io.*;
7 import Jt.jndi.*;
8 import javax.jms.*;
9 import javax.naming.*;
10
11 /**
12  * Jt Adapter for the JMS point-to-point API.
13  */

14
15 public class JtJMSQueueAdapter extends JtAdapter implements MessageListener {
16
17   private String JavaDoc queue;
18   private String JavaDoc connectionFactory;
19   private long timeout = 1L; // Receives the next message within the timeout interval
20
private Object JavaDoc subject = null;
21   private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
22   private int priority = Message.DEFAULT_PRIORITY;
23   private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; // message never expires
24

25
26   private transient JtJNDIAdapter jndiAdapter = null;
27   private transient boolean initted = false;
28
29   private transient Queue jmsQueue;
30   private transient QueueConnectionFactory qcFactory;
31   private transient QueueConnection queueConnection;
32   private transient QueueSession queueSession;
33   private transient QueueSender queueSender;
34   private transient QueueReceiver queueReceiver;
35
36   // Intialize the JMS Adapter
37

38   private void initial () {
39     JtMessage msg = new JtMessage ("JtLOOKUP");
40
41         
42     jndiAdapter = new JtJNDIAdapter ();
43
44     if (connectionFactory == null) {
45       handleError ("Attribute connectionFactory needs to be set.");
46       return;
47     }
48
49     msg.setMsgContent (connectionFactory);
50  
51     qcFactory = (QueueConnectionFactory) sendMessage (jndiAdapter, msg);
52
53     if (qcFactory == null)
54       return;
55
56     if (queue == null) {
57       handleError ("Attribute queue needs to be set.");
58       return;
59     }
60     msg.setMsgContent (queue);
61
62     jmsQueue = (Queue) sendMessage (jndiAdapter, msg);
63
64
65     if (jmsQueue == null)
66       return;
67
68     try {
69       queueConnection = qcFactory.createQueueConnection ();
70       queueSession = queueConnection.createQueueSession (false,
71                                  Session.AUTO_ACKNOWLEDGE);
72
73     } catch (Exception JavaDoc e) {
74       handleException (e);
75     }
76
77   }
78
79   /**
80     * Method used by this adapter to consume JMS messages
81     * asynchronously.
82     * @param message JMS message
83     */

84
85   public void onMessage (Message message) {
86     JtMessage msg;
87     ObjectMessage omessage;
88
89     if (message == null)
90       return;
91
92     
93     try {
94
95       omessage = (ObjectMessage) message;
96       msg = (JtMessage) omessage.getObject ();
97
98       if (subject == null) {
99         handleWarning ("JtJMSQueueAdapter.onMessage: attribute 'subject' needs to be set.");
100         return;
101       }
102
103       // Forward Jt messages to the subject object
104

105       sendMessage (subject, msg);
106
107     } catch (Exception JavaDoc ex) {
108       handleException (ex);
109     }
110   }
111   
112   /**
113     * Process object messages.
114     * <ul>
115     * <li> JtSEND - Send a JtMessage (msgContent) to the JMS queue.
116     * <li> JtRECEIVE - Receive a JtMessage from the JMS queue and return it.
117     * <li> The message is consumed synchronously.
118     * <li> JtSTART_LISTENING - Start listening and consume messages asynchronously.
119     * </ul>
120     */

121
122   public Object JavaDoc processMessage (Object JavaDoc message) {
123   String JavaDoc content;
124   String JavaDoc query;
125   JtMessage e = (JtMessage) message;
126   Object JavaDoc reply;
127   JtMessage msg;
128
129
130       if (e == null || (e.getMsgId() == null))
131           return (null);
132
133
134       if (e.getMsgId().equals ("JtREMOVE")) {
135     return (null);
136       }
137
138       if (!initted) {
139         initial ();
140         initted = true;
141       }
142
143
144       if (e.getMsgId().equals("JtSEND")) {
145         msg = (JtMessage) e.getMsgContent ();
146         reply = sendJMSMessage (msg);
147         return (reply);
148       }
149
150
151       if (e.getMsgId().equals("JtSTART_LISTENING")) {
152         startListening ();
153         return (null);
154       }
155
156       if (e.getMsgId().equals("JtRECEIVE")) {
157         reply = receiveJMSMessage ();
158         return (reply);
159       }
160
161       // Test the Sender functionality
162

163       if (e.getMsgId().equals("JtTEST_SENDER")) {
164     reply = testSender ();
165     return (reply);
166       }
167
168       // Test the Receiver functionality
169

170       if (e.getMsgId().equals("JtTEST_RECEIVER")) {
171     reply = testReceiver ();
172     return (reply);
173       }
174
175
176       handleError
177     ("processMessage: invalid message id:"+
178         e.getMsgId());
179       return (null);
180   }
181
182
183
184   /**
185    * Specifies the JNDI name of the JMS queue.
186    * @param queue queue
187    */

188
189   public void setQueue (String JavaDoc queue) {
190     this.queue = queue;
191   }
192
193
194   /**
195    * Returns the JNDI name of the JMS queue.
196    */

197
198   public String JavaDoc getQueue () {
199     return (queue);
200   }
201
202   /**
203    * Specifies the timeout interval (refer to javax.jms.MessageConsumer).
204    * @param timeout timeout
205    */

206
207   public void setTimeout (long timeout) {
208     this.timeout = timeout;
209   }
210
211
212   /**
213    * Returns timeout (refer to javax.jms.MessageConsumer).
214    */

215
216   public long getTimeout () {
217     return (timeout);
218   }
219
220
221   /**
222    * Sets the delivery mode (persistent or non-persistent).
223    * Messages will be sent to the JMS queue using this delivery mode.
224    * @param deliveryMode delivery mode
225    */

226
227   public void setDeliveryMode (int deliveryMode) {
228     this.deliveryMode = deliveryMode;
229   }
230
231
232   /**
233    * Returns the delivery mode (persistent or non-persistent)
234    */

235
236   public long getDeliveryMode () {
237     return (deliveryMode);
238   }
239
240
241
242
243   /**
244    * Sets the message priority. Messages will be sent to the JMS queue
245    * using this priority.
246    * @param priority message priority
247    */

248
249   public void setPriority (int priority) {
250     this.priority = priority;
251   }
252
253
254   /**
255    * Returns the message priority.
256    */

257
258   public long getPriority () {
259     return (priority);
260   }
261
262
263   /**
264    * Sets the message time to live (in milliseconds). Messages will be sent to the JMS queue
265    * using this value.
266    * @param timeToLive message time to live
267    */

268
269   public void setTimeToLive (long timeToLive) {
270     this.timeToLive = timeToLive;
271   }
272
273
274   /**
275    * Returns the message time to live (in milliseconds).
276    */

277
278   public long getTimeToLive () {
279     return (timeToLive);
280   }
281
282
283
284   /**
285    * Specifies the subject (JtObject). Messages received asynchronously are forwarded to
286    * this Jt object for processing.
287    * @param subject subject
288    */

289
290   public void setSubject (Object JavaDoc subject) {
291     this.subject = subject;
292   }
293
294
295   /**
296    * Returns the subject. Messages received asynchronously are forwarded to
297    * this Jt object for processing.
298    */

299
300   public Object JavaDoc getSubject () {
301     return (subject);
302   }
303
304
305   /**
306    * Specifies the JNDI name of the connection factory.
307    * @param connectionFactory connection factory
308    */

309
310   public void setConnectionFactory (String JavaDoc connectionFactory) {
311     this.connectionFactory = connectionFactory;
312   }
313
314
315   /**
316    * Returns the JNDI name of the connection factory.
317    */

318
319   public String JavaDoc getConnectionFactory () {
320     return (connectionFactory);
321   }
322
323
324
325
326   private Object JavaDoc testReceiver () {
327     String JavaDoc reply = "PASS";
328     ObjectMessage message;
329     JtMessage msg;
330
331  
332
333     //for (;;) {
334

335       msg = (JtMessage) sendMessage (this, new JtMessage ("JtRECEIVE"));
336
337       if (msg == null) {
338         System.out.println ("no more messages");
339         return (reply);
340       }
341
342       System.out.println ("msgId:" + msg.getMsgId ());
343
344     //}
345

346     return (reply);
347   }
348
349
350   // Send a Jt message using JMS as the transport layer
351

352   private Object JavaDoc sendJMSMessage (JtMessage msg) {
353
354     ObjectMessage omsg;
355     String JavaDoc reply = "PASS";
356
357
358     if (msg == null) {
359       reply = "FAIL";
360       return (reply);
361     }
362
363     try {
364
365       if (queueSender == null)
366         queueSender = queueSession.createSender (jmsQueue);
367
368       omsg = queueSession.createObjectMessage ();
369       omsg.setObject (msg);
370
371       // send the message. Use the appropriate parameters (priority,
372
// deliveryMode, etc).
373
queueSender.send (omsg, deliveryMode, priority, timeToLive);
374     } catch (Exception JavaDoc e) {
375       handleException (e);
376       reply = "FAIL";
377     }
378     return (reply);
379   }
380
381   private void startListening () {
382
383
384     try {
385       if (queueReceiver == null)
386         queueReceiver = queueSession.createReceiver (jmsQueue);
387
388
389       if (queueConnection == null) {
390         handleError ("receiveJMSMessage:queueConnection is null");
391         return;
392       }
393
394       // Use the adapter as the message listener
395

396       queueReceiver.setMessageListener (this);
397
398       queueConnection.start ();
399     } catch (Exception JavaDoc ex) {
400
401       handleException (ex);
402     }
403
404   }
405   
406   private JtMessage receiveJMSMessage () {
407
408     JtMessage msg = null;
409     ObjectMessage message;
410
411
412     try {
413
414       if (queueReceiver == null)
415         queueReceiver = queueSession.createReceiver (jmsQueue);
416
417
418       if (queueConnection == null) {
419         handleError ("receiveJMSMessage:queueConnection is null");
420         return (null);
421       }
422       queueConnection.start ();
423
424       message = (ObjectMessage) queueReceiver.receive (timeout);
425       if (message != null) {
426         msg = (JtMessage) message.getObject ();
427       }
428
429     } catch (Exception JavaDoc e) {
430       handleException (e);
431     }
432
433     return (msg);
434
435   }
436
437
438
439   private Object JavaDoc testSender () {
440     String JavaDoc reply = "PASS";
441     TextMessage message;
442     ObjectMessage omsg;
443     JtMessage msg = new JtMessage ("JtHELLO");
444     JtMessage wrapper = new JtMessage ("JtSEND");
445
446     wrapper.setMsgContent (msg); // wrapper message ("JtSEND") that contains
447
// the real message ("JtHELLO")
448

449
450     // send the message to the JMS adapter (JMS queue)
451
return (sendMessage (this, wrapper));
452   }
453
454   /**
455    * Unit tests the messages processed by JtJMSQueueAdapter.
456    */

457
458   public static void main (String JavaDoc[] args) {
459     JtFactory main;
460     JtJMSQueueAdapter jmsAdapter;
461
462     main = new JtFactory ();
463
464
465     jmsAdapter = (JtJMSQueueAdapter) main.createObject ("Jt.jms.JtJMSQueueAdapter", "jmsAdapter");
466
467     if (args.length < 1) {
468     System.err.println ("Usage: java Jt.jms.JtJMSQueueAdapter -s or java Jt.jms.JtJMSQueueAdapter -r");
469     System.exit (1);
470     }
471
472     if (args[0].equals ("-s")) {
473       main.sendMessage (jmsAdapter, new JtMessage ("JtTEST_SENDER"));
474       System.exit (0);
475     }
476
477     if (args[0].equals ("-r")) {
478       main.sendMessage (jmsAdapter, new JtMessage ("JtTEST_RECEIVER"));
479       System.exit (0);
480     }
481     
482     System.err.println ("Usage: java Jt.jms.JtJMSQueueAdapter -s or java Jt.jms.JtJMSQueueAdapter -r");
483
484     main.removeObject ("jtAdapter");
485
486   }
487 }
Popular Tags