KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > util > BridgeUnifiedModule


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2003 - Bull SA
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): Frederic Maistre (Bull SA)
21  * Contributor(s):
22  */

23 package org.objectweb.joram.mom.util;
24
25 import fr.dyade.aaa.agent.AgentId;
26 import fr.dyade.aaa.agent.Channel;
27 import fr.dyade.aaa.util.Daemon;
28 import org.objectweb.joram.shared.messages.Message;
29
30 import java.util.Enumeration JavaDoc;
31 import java.util.Properties JavaDoc;
32 import java.util.Vector JavaDoc;
33
34 import javax.jms.*;
35 import javax.jms.IllegalStateException JavaDoc;
36
37
38 /**
39  * The <code>BridgeUnifiedModule</code> class is a bridge module based on the
40  * JMS 1.1 unified semantics and classes.
41  */

42 public class BridgeUnifiedModule implements javax.jms.ExceptionListener JavaDoc,
43                                             javax.jms.MessageListener JavaDoc,
44                                             java.io.Serializable JavaDoc
45 {
46   /** Identifier of the agent using this module. */
47   protected AgentId agentId;
48
49   /** Name of the JNDI factory class to use. */
50   protected String JavaDoc jndiFactory = null;
51   /** JNDI URL. */
52   protected String JavaDoc jndiUrl = null;
53   /** ConnectionFactory JNDI name. */
54   protected String JavaDoc cnxFactName;
55   /** Destination JNDI name. */
56   protected String JavaDoc destName;
57   /** Connection factory object for connecting to the foreign JMS server. */
58   protected ConnectionFactory cnxFact = null;
59   /** Foreign JMS destination object. */
60   protected Destination dest = null;
61   /** User identification for connecting to the foreign JMS server. */
62   protected String JavaDoc userName = null;
63   /** User password for connecting to the foreign JMS server. */
64   protected String JavaDoc password = null;
65   /** JMS clientID field. */
66   protected String JavaDoc clientID = null;
67   /** Selector for filtering messages. */
68   protected String JavaDoc selector;
69
70   /** <code>true</code> if the module is fully usable. */
71   protected boolean usable = true;
72   /** Message explaining why the module is not usable. */
73   protected String JavaDoc notUsableMessage;
74
75   /** Connection to the foreign JMS server. */
76   protected transient Connection cnx;
77   /** Session for sending messages to the foreign JMS destination. */
78   protected transient Session producerSession;
79   /** Session for getting messages from the foreign JMS destination. */
80   protected transient Session consumerSession;
81   /** Producer object. */
82   protected transient MessageProducer producer;
83   /** Consumer object. */
84   protected transient MessageConsumer consumer;
85
86   /** <code>true</code> if a listener has been set on the JMS consumer. */
87   protected transient boolean listener;
88   /** Vector holding the pending messages to send after reconnection. */
89   protected transient Vector JavaDoc qout;
90
91   /** Daemon used for requesting messages. */
92   protected transient ConsumerDaemon consumerDaemon;
93   /** Daemon used for the reconnection process. */
94   protected transient ReconnectionDaemon reconnectionDaemon;
95
96
97   /** Constructs a <code>BridgeUnifiedModule</code> module. */
98   public BridgeUnifiedModule()
99   {}
100
101
102   /**
103    * Initializes the module's parameters.
104    *
105    * @param agentId Identifier of the agent using the module.
106    * @param prop JMS properties required for establishing the link with the
107    * foreign JMS server.
108    *
109    * @exception IllegalArgumentException If the provided properties are
110    * invalid.
111    */

112   public void init(AgentId agentId, Properties JavaDoc prop)
113   {
114     this.agentId = agentId;
115
116     jndiFactory = prop.getProperty("jndiFactory");
117     jndiUrl = prop.getProperty("jndiUrl");
118     
119     cnxFactName = prop.getProperty("connectionFactoryName");
120     destName = prop.getProperty("destinationName");
121
122     if (cnxFactName == null)
123       throw new IllegalArgumentException JavaDoc("Missing ConnectionFactory "
124                                          + "JNDI name.");
125     else if (destName == null)
126       throw new IllegalArgumentException JavaDoc("Missing Destination "
127                                          + "JNDI name.");
128
129     String JavaDoc userName = prop.getProperty("userName");
130     String JavaDoc password = prop.getProperty("password");
131
132     if (userName != null && password != null) {
133       this.userName = userName;
134       this.password = password;
135     }
136
137     clientID = prop.getProperty("clientId");
138     selector = prop.getProperty("selector");
139   }
140
141   /**
142    * Launches the connection process to the foreign JMS server.
143    *
144    * @exception javax.jms.IllegalStateException If the module can't access
145    * the foreign JMS server.
146    * @exception javax.jms.JMSException If the needed JMS resources can't be
147    * created.
148    */

149   public void connect() throws JMSException
150   {
151     if (! usable)
152       throw new IllegalStateException JavaDoc(notUsableMessage);
153
154     listener = false;
155     // Creating the module's daemons.
156
consumerDaemon = new ConsumerDaemon();
157     reconnectionDaemon = new ReconnectionDaemon();
158
159     // Administered objects have not been retrieved: launching the startup
160
// daemon.
161
if (cnxFact == null || dest == null) {
162       StartupDaemon startup = new StartupDaemon();
163       startup.start();
164     }
165     // Administered objects have been retrieved: connecting.
166
else {
167       try {
168         doConnect();
169       }
170       catch (JMSException exc) {
171         reconnectionDaemon.reconnect();
172       }
173     }
174   }
175
176   /**
177    * Sets a message listener on the foreign JMS destination.
178    *
179    * @exception javax.jms.IllegalStateException If the module state does
180    * not allow to set a listener.
181    */

182   public void setMessageListener() throws IllegalStateException JavaDoc
183   {
184     if (! usable)
185       throw new IllegalStateException JavaDoc(notUsableMessage);
186
187     listener = true;
188     try {
189       setConsumer();
190       consumer.setMessageListener(this);
191       cnx.start();
192     }
193     catch (JMSException exc) {}
194   }
195
196   /**
197    * Unsets the set message listener on the foreign JMS destination.
198    */

199   public void unsetMessageListener()
200   {
201     try {
202       cnx.stop();
203       consumer.setMessageListener(null);
204       unsetConsumer();
205     }
206     catch (JMSException exc) {}
207     listener = false;
208   }
209
210   /**
211    * Synchronous method requesting an immediate delivery from the foreign
212    * JMS destination.
213    *
214    * @return The JMS message formatted into a JORAM MOM message, or
215    * <code>null</code> if no message is available or if the request
216    * fails.
217    *
218    * @exception javax.jms.IllegalStateException If the module state does
219    * not allow to request a message.
220    */

221   public Message receiveNoWait() throws IllegalStateException JavaDoc
222   {
223     if (! usable)
224       throw new IllegalStateException JavaDoc(notUsableMessage);
225
226     Message momMessage = null;
227     try {
228       setConsumer();
229       cnx.start();
230       try {
231         momMessage = MessageConverterModule.convert(consumer.receiveNoWait());
232         consumerSession.commit();
233       }
234       // Conversion error: denying the message.
235
catch (MessageFormatException exc) {
236         consumerSession.rollback();
237       }
238     }
239     // Connection start, or session commit/rollback failed:
240
// setting the message to null.
241
catch (JMSException commitExc) {
242       momMessage = null;
243     }
244     return momMessage;
245   }
246
247   /**
248    * Asynchronous method requesting a delivery from the foreign
249    * JMS destination.
250    *
251    * @exception javax.jms.IllegalStateException If the module state does
252    * not allow to request a message.
253    */

254   public void receive() throws IllegalStateException JavaDoc
255   {
256     if (! usable)
257       throw new IllegalStateException JavaDoc(notUsableMessage);
258
259     consumerDaemon.receive();
260   }
261   
262   /**
263    * Sends a message to the foreign JMS destination.
264    *
265    * @exception javax.jms.IllegalStateException If the module's state does
266    * not permit message sendings.
267    * @exception javax.jms.MessageFormatException If the MOM message could not
268    * be converted into a foreign JMS message.
269    */

270   public void send(org.objectweb.joram.shared.messages.Message message)
271               throws JMSException
272   {
273     if (! usable)
274       throw new IllegalStateException JavaDoc(notUsableMessage);
275
276     try {
277       producer.send(MessageConverterModule.convert(producerSession, message));
278       acknowledge(message);
279     }
280     catch (javax.jms.MessageFormatException JavaDoc exc) {
281       throw exc;
282     }
283     // Connection failure? Keeping the message for later delivery.
284
catch (javax.jms.JMSException JavaDoc exc) {
285       qout.add(message);
286     }
287   }
288
289   /**
290    * Interrupts the daemons and closes the connection.
291    */

292   public void close()
293   {
294     try {
295       cnx.stop();
296     }
297     catch (JMSException exc) {}
298
299     unsetMessageListener();
300
301     try {
302       consumerDaemon.interrupt();
303     }
304     catch (Exception JavaDoc exc) {}
305     try {
306       reconnectionDaemon.interrupt();
307     }
308     catch (Exception JavaDoc exc) {}
309
310     try {
311       cnx.close();
312     }
313     catch (JMSException exc) {}
314   }
315
316   /**
317    * Implements the <code>javax.jms.ExceptionListener</code> interface for
318    * catching the failures of the connection to the remote JMS server.
319    * <p>
320    * Reacts by launching a reconnection process.
321    */

322   public void onException(JMSException exc)
323   {
324     reconnectionDaemon.reconnect();
325   }
326
327   /**
328    * Implements the <code>javax.jms.MessageListener</code> interface for
329    * processing the asynchronous deliveries coming from the foreign JMS
330    * server.
331    */

332   public void onMessage(javax.jms.Message JavaDoc jmsMessage)
333   {
334     try {
335       try {
336         Message momMessage = MessageConverterModule.convert(jmsMessage);
337         consumerSession.commit();
338         Channel.sendTo(agentId, new BridgeDeliveryNot(momMessage));
339       }
340       // Conversion error: denying the message.
341
catch (MessageFormatException conversionExc) {
342         consumerSession.rollback();
343       }
344     }
345     // Commit or rollback failed: nothing to do.
346
catch (JMSException exc) {}
347   }
348
349   /**
350    * Opens a connection with the foreign JMS server and creates the
351    * JMS resources for interacting with the foreign JMS destination.
352    *
353    * @exception JMSException If the needed JMS resources could not be created.
354    */

355   protected void doConnect() throws JMSException
356   {
357     if (userName != null && password != null)
358       cnx = cnxFact.createConnection(userName, password);
359     else
360       cnx = cnxFact.createConnection();
361     cnx.setExceptionListener(this);
362
363     if (clientID != null)
364       cnx.setClientID(clientID);
365
366     producerSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
367     producer = producerSession.createProducer(dest);
368
369     consumerSession = cnx.createSession(true, 0);
370   }
371
372   /**
373    * Sets the JMS consumer on the foreign destination.
374    *
375    * @exception JMSException If the JMS consumer could not be created.
376    */

377   protected void setConsumer() throws JMSException
378   {
379     if (consumer != null)
380       return;
381
382     try {
383       if (dest instanceof Queue)
384         consumer = consumerSession.createConsumer(dest, selector);
385       else
386         consumer = consumerSession.createDurableSubscriber((Topic) dest,
387                                                            agentId.toString(),
388                                                            selector,
389                                                            false);
390     }
391     catch (JMSException exc) {
392       throw exc;
393     }
394     catch (Exception JavaDoc exc) {
395       throw new JMSException("JMS resources do not allow to create consumer: "
396                              + exc);
397     }
398   }
399
400   /**
401    * Unsets the JMS consumer.
402    */

403   protected void unsetConsumer()
404   {
405     try {
406       if (dest instanceof Topic)
407         consumerSession.unsubscribe(agentId.toString());
408
409       consumer.close();
410     }
411     catch (Exception JavaDoc exc) {}
412
413     consumer = null;
414   }
415
416   /**
417    * Acknowledges a message successfuly delivered to the foreign JMS server.
418    */

419   protected void acknowledge(Message message)
420   {
421     Channel.sendTo(agentId, new BridgeAckNot(message.getIdentifier()));
422   }
423
424
425   /**
426    * The <code>StartupDaemon</code> thread is responsible for retrieving
427    * the needed JMS administered objects from the JNDI server.
428    */

429   protected class StartupDaemon extends Daemon
430   {
431     /** Constructs a <code>StartupDaemon</code> thread. */
432     protected StartupDaemon()
433     {
434       super(agentId.toString() + ":StartupDaemon");
435       setDaemon(false);
436     }
437
438     /** The daemon's loop. */
439     public void run()
440     {
441       javax.naming.Context JavaDoc jndiCtx = null;
442       try {
443         canStop = true;
444
445         // Administered objects still to be retrieved: getting them from
446
// JNDI.
447
if (cnxFact == null || dest == null) {
448           if (jndiFactory == null || jndiUrl == null)
449             jndiCtx = new javax.naming.InitialContext JavaDoc();
450           else {
451             java.util.Hashtable JavaDoc env = new java.util.Hashtable JavaDoc();
452             env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
453             env.put(javax.naming.Context.PROVIDER_URL, jndiUrl);
454             jndiCtx = new javax.naming.InitialContext JavaDoc(env);
455           }
456           cnxFact = (ConnectionFactory) jndiCtx.lookup(cnxFactName);
457           dest = (Destination) jndiCtx.lookup(destName);
458         }
459         try {
460           doConnect();
461         }
462         catch (AbstractMethodError JavaDoc exc) {
463           usable = false;
464           notUsableMessage = "Retrieved administered objects types not "
465                            + "compatible with the 'unified' communication "
466                            + " mode: " + exc;
467         }
468         catch (ClassCastException JavaDoc exc) {
469           usable = false;
470           notUsableMessage = "Retrieved administered objects types not "
471                            + "compatible with the chosen communication mode: "
472                            + exc;
473         }
474         catch (JMSSecurityException exc) {
475           usable = false;
476           notUsableMessage = "Provided user identification does not allow "
477                            + "to connect to the foreign JMS server: "
478                            + exc;
479         }
480         catch (JMSException exc) {
481           reconnectionDaemon.reconnect();
482         }
483         catch (Throwable JavaDoc exc) {
484           usable = false;
485           notUsableMessage = "" + exc;
486         }
487       }
488       catch (javax.naming.NameNotFoundException JavaDoc exc) {
489         usable = false;
490         if (cnxFact == null)
491           notUsableMessage = "Could not retrieve ConnectionFactory ["
492                              + cnxFactName
493                              + "] from JNDI: " + exc;
494         else if (dest == null)
495           notUsableMessage = "Could not retrieve Destination ["
496                              + destName
497                              + "] from JNDI: " + exc;
498       }
499       catch (javax.naming.NamingException JavaDoc exc) {
500         usable = false;
501         notUsableMessage = "Could not access JNDI: " + exc;
502       }
503       catch (ClassCastException JavaDoc exc) {
504         usable = false;
505         notUsableMessage = "Error while retrieving administered objects "
506                            + "through JNDI possibly because of missing "
507                            + "foreign JMS client libraries in classpath: "
508                            + exc;
509       }
510       catch (Exception JavaDoc exc) {
511         usable = false;
512         notUsableMessage = "Error while retrieving administered objects "
513                            + "through JNDI: "
514                            + exc;
515       }
516       finally {
517         // Closing the JNDI context.
518
try {
519           jndiCtx.close();
520         }
521         catch (Exception JavaDoc exc) {}
522
523         finish();
524       }
525     }
526
527     /** Shuts the daemon down. */
528     public void shutdown()
529     {}
530
531     /** Releases the daemon's resources. */
532     public void close()
533     {}
534   }
535
536   /**
537    * The <code>ReconnectionDaemon</code> thread is responsible for reconnecting
538    * the bridge module with the foreign JMS server in case of disconnection.
539    */

540   protected class ReconnectionDaemon extends Daemon
541   {
542     /** Number of reconnection trials of the first step. */
543     private int attempts1 = 30;
544     /** Retry interval (in milliseconds) of the first step. */
545     private long interval1 = 1000L;
546     /** Number of reconnection trials of the second step. */
547     private int attempts2 = 55;
548     /** Retry interval (in milliseconds) of the second step. */
549     private long interval2 = 5000L;
550     /** Retry interval (in milliseconds) of the third step. */
551     private long interval3 = 60000L;
552
553     /** Constructs a <code>ReconnectionDaemon</code> thread. */
554     protected ReconnectionDaemon()
555     {
556       super(agentId.toString() + ":ReconnectionDaemon");
557       setDaemon(false);
558     }
559
560     /** Notifies the daemon to start reconnecting. */
561     protected void reconnect() {
562       if (running)
563         return;
564
565       consumer = null;
566       start();
567     }
568
569     /** The daemon's loop. */
570     public void run()
571     {
572       int attempts = 0;
573       long interval;
574       Message msg;
575
576       try {
577         while (running) {
578           canStop = true;
579
580           attempts++;
581
582           if (attempts <= 30)
583             interval = interval1;
584           else if (attempts <= 55)
585             interval = interval2;
586           else
587             interval = interval3;
588
589           try {
590             Thread.sleep(interval);
591             doConnect();
592             
593             // Setting the listener, if any.
594
if (listener)
595               setMessageListener();
596             // Starting the consumer daemon:
597
consumerDaemon.start();
598             // Sending the pending messages, if any:
599
while (! qout.isEmpty())
600               send((Message) qout.remove(0));
601           }
602           catch (Exception JavaDoc exc) {
603             continue;
604           }
605           canStop = false;
606           break;
607         }
608       }
609       finally {
610         finish();
611       }
612     }
613
614     /** Shuts the daemon down. */
615     public void shutdown()
616     {}
617
618     /** Releases the daemon's resources. */
619     public void close()
620     {}
621   }
622
623   /**
624    * The <code>ConsumerDaemon</code> thread allows to call
625    * <code>MessageConsumer.receive()</code> for requesting a foreign JMS
626    * message without blocking the JORAM server.
627    */

628   protected class ConsumerDaemon extends Daemon
629   {
630     /** Counter of pending "receive" requests. */
631     private int requests = 0;
632
633
634     /** Constructs a <code>ReceiverDaemon</code> thread. */
635     protected ConsumerDaemon()
636     {
637       super(agentId.toString() + ":ConsumerDaemon");
638       setDaemon(false);
639     }
640
641     /** Notifies the daemon of a new "receive" request. */
642     protected synchronized void receive()
643     {
644       requests++;
645
646       if (running)
647         return;
648
649       start();
650     }
651
652     /** The daemon's loop. */
653     public void run()
654     {
655       try {
656         Message momMessage;
657         BridgeDeliveryNot notif;
658
659         setConsumer();
660         cnx.start();
661
662         while (requests > 0 && running) {
663           canStop = true;
664
665           // Expecting a message:
666
try {
667             momMessage = MessageConverterModule.convert(consumer.receive());
668             consumerSession.commit();
669           }
670           // Conversion error: denying the message.
671
catch (MessageFormatException messageExc) {
672             consumerSession.rollback();
673             continue;
674           }
675           // Processing the delivery.
676
canStop = false;
677           notif = new BridgeDeliveryNot(momMessage);
678           Channel.sendTo(agentId, notif);
679           requests--;
680         }
681       }
682       // Connection loss?
683
catch (JMSException exc) {}
684       finally {
685         finish();
686       }
687     }
688
689     /** Shuts the daemon down. */
690     public void shutdown()
691     {}
692
693     /** Releases the daemon's resources. */
694     public void close()
695     {}
696   }
697
698
699   /** Deserializes a <code>BridgeUnifiedModule</code> instance. */
700   private void readObject(java.io.ObjectInputStream JavaDoc in)
701                throws java.io.IOException JavaDoc, ClassNotFoundException JavaDoc
702   {
703     in.defaultReadObject();
704     qout = new Vector JavaDoc();
705   }
706 }
707
Popular Tags