KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > mom > dest > bridge > BridgeModule


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2003 - Bull SA
4  * Copyright (C) 2007 - ScalAgent Distributed Technologies
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (Bull SA)
22  * Contributor(s): Nicolas Tachker (ScalAgent)
23  */

24 package org.objectweb.joram.mom.dest.bridge;
25
26 import java.util.Properties JavaDoc;
27 import java.util.Vector JavaDoc;
28
29 import javax.jms.Connection JavaDoc;
30 import javax.jms.ConnectionFactory JavaDoc;
31 import javax.jms.Destination JavaDoc;
32 import javax.jms.IllegalStateException JavaDoc;
33 import javax.jms.JMSException JavaDoc;
34 import javax.jms.JMSSecurityException JavaDoc;
35 import javax.jms.MessageConsumer JavaDoc;
36 import javax.jms.MessageFormatException JavaDoc;
37 import javax.jms.MessageProducer JavaDoc;
38 import javax.jms.Queue JavaDoc;
39 import javax.jms.Session JavaDoc;
40 import javax.jms.Topic JavaDoc;
41
42 import org.objectweb.joram.shared.messages.Message;
43
44 import fr.dyade.aaa.agent.AgentId;
45 import fr.dyade.aaa.agent.Channel;
46 import fr.dyade.aaa.util.Daemon;
47
48 /**
49  * The <code>BridgeUnifiedModule</code> class is a bridge module based on the
50  * JMS 1.1 unified semantics and classes.
51  */

52 public class BridgeModule implements javax.jms.ExceptionListener JavaDoc,
53                                             javax.jms.MessageListener JavaDoc,
54                                             java.io.Serializable JavaDoc
55 {
56   /** Identifier of the agent using this module. */
57   protected AgentId agentId;
58
59   /** Name of the JNDI factory class to use. */
60   protected String JavaDoc jndiFactory = null;
61   /** JNDI URL. */
62   protected String JavaDoc jndiUrl = null;
63   /** ConnectionFactory JNDI name. */
64   protected String JavaDoc cnxFactName;
65   /** Destination JNDI name. */
66   protected String JavaDoc destName;
67   /** Connection factory object for connecting to the foreign JMS server. */
68   protected ConnectionFactory JavaDoc cnxFact = null;
69   /** Foreign JMS destination object. */
70   protected Destination JavaDoc dest = null;
71   /** User identification for connecting to the foreign JMS server. */
72   protected String JavaDoc userName = null;
73   /** User password for connecting to the foreign JMS server. */
74   protected String JavaDoc password = null;
75   /** JMS clientID field. */
76   protected String JavaDoc clientID = null;
77   /** Selector for filtering messages. */
78   protected String JavaDoc selector;
79
80   /** <code>true</code> if the module is fully usable. */
81   protected boolean usable = true;
82   /** Message explaining why the module is not usable. */
83   protected String JavaDoc notUsableMessage;
84
85   /** Connection to the foreign JMS server. */
86   protected transient Connection JavaDoc cnx;
87   /** Session for sending messages to the foreign JMS destination. */
88   protected transient Session JavaDoc producerSession;
89   /** Session for getting messages from the foreign JMS destination. */
90   protected transient Session JavaDoc consumerSession;
91   /** Producer object. */
92   protected transient MessageProducer JavaDoc producer;
93   /** Consumer object. */
94   protected transient MessageConsumer JavaDoc consumer;
95
96   /** <code>true</code> if a listener has been set on the JMS consumer. */
97   protected transient boolean listener;
98   /** Vector holding the pending messages to send after reconnection. */
99   protected transient Vector JavaDoc qout;
100
101   /** Daemon used for requesting messages. */
102   protected transient ConsumerDaemon consumerDaemon;
103   /** Daemon used for the reconnection process. */
104   protected transient ReconnectionDaemon reconnectionDaemon;
105
106   /**
107    * Automatic receive for a bridge Queue.
108    * The foreign messages are transfer in bridge queue,
109    * without client request.
110    */

111   private boolean automaticRequest = false;
112
113   /** Constructs a <code>BridgeUnifiedModule</code> module. */
114   public BridgeModule()
115   {}
116
117
118   /**
119    * Initializes the module's parameters.
120    *
121    * @param agentId Identifier of the agent using the module.
122    * @param prop JMS properties required for establishing the link with the
123    * foreign JMS server.
124    *
125    * @exception IllegalArgumentException If the provided properties are
126    * invalid.
127    */

128   public void init(AgentId agentId, Properties JavaDoc prop) {
129     this.agentId = agentId;
130
131     jndiFactory = prop.getProperty("jndiFactory");
132     jndiUrl = prop.getProperty("jndiUrl");
133     
134     cnxFactName = prop.getProperty("connectionFactoryName");
135     if (cnxFactName == null)
136       throw new IllegalArgumentException JavaDoc("Missing ConnectionFactory JNDI name.");
137
138     destName = prop.getProperty("destinationName");
139     if (destName == null)
140       throw new IllegalArgumentException JavaDoc("Missing Destination JNDI name.");
141
142     String JavaDoc userName = prop.getProperty("userName");
143     String JavaDoc password = prop.getProperty("password");
144
145     if (userName != null && password != null) {
146       this.userName = userName;
147       this.password = password;
148     }
149
150     clientID = prop.getProperty("clientId");
151     selector = prop.getProperty("selector");
152     automaticRequest = Boolean.valueOf(
153           prop.getProperty("automaticRequest","false")).booleanValue();
154   }
155
156   /**
157    * Launches the connection process to the foreign JMS server.
158    *
159    * @exception javax.jms.IllegalStateException If the module can't access
160    * the foreign JMS server.
161    * @exception javax.jms.JMSException If the needed JMS resources can't be
162    * created.
163    */

164   public void connect() throws JMSException JavaDoc {
165     if (! usable)
166       throw new IllegalStateException JavaDoc(notUsableMessage);
167
168     listener = false;
169     // Creating the module's daemons.
170
consumerDaemon = new ConsumerDaemon();
171     reconnectionDaemon = new ReconnectionDaemon();
172
173     // Administered objects have not been retrieved: launching the startup
174
// daemon.
175
if (cnxFact == null || dest == null) {
176       StartupDaemon startup = new StartupDaemon();
177       startup.start();
178     }
179     // Administered objects have been retrieved: connecting.
180
else {
181       try {
182         doConnect();
183       }
184       catch (JMSException JavaDoc exc) {
185         reconnectionDaemon.reconnect();
186       }
187     }
188   }
189
190   /**
191    * Sets a message listener on the foreign JMS destination.
192    *
193    * @exception javax.jms.IllegalStateException If the module state does
194    * not allow to set a listener.
195    */

196   public void setMessageListener() throws IllegalStateException JavaDoc {
197     if (! usable)
198       throw new IllegalStateException JavaDoc(notUsableMessage);
199
200     listener = true;
201     try {
202       setConsumer();
203       consumer.setMessageListener(this);
204       cnx.start();
205     } catch (JMSException JavaDoc exc) {}
206   }
207
208   /**
209    * Unsets the set message listener on the foreign JMS destination.
210    */

211   public void unsetMessageListener() {
212     try {
213       cnx.stop();
214       consumer.setMessageListener(null);
215       unsetConsumer();
216     } catch (JMSException JavaDoc exc) {}
217     listener = false;
218   }
219
220   /**
221    * Synchronous method requesting an immediate delivery from the foreign
222    * JMS destination.
223    *
224    * @return The JMS message formatted into a JORAM MOM message, or
225    * <code>null</code> if no message is available or if the request
226    * fails.
227    *
228    * @exception javax.jms.IllegalStateException If the module state does
229    * not allow to request a message.
230    */

231   public Message receiveNoWait() throws IllegalStateException JavaDoc {
232     if (! usable)
233       throw new IllegalStateException JavaDoc(notUsableMessage);
234
235     Message momMessage = null;
236     try {
237       setConsumer();
238       cnx.start();
239       try {
240         org.objectweb.joram.client.jms.Message clientMessage =
241           org.objectweb.joram.client.jms.Message.convertJMSMessage(consumer.receiveNoWait());
242         momMessage = clientMessage.getMomMsg();
243         consumerSession.commit();
244       }
245       // Conversion error: denying the message.
246
catch (MessageFormatException JavaDoc exc) {
247         consumerSession.rollback();
248       }
249     }
250     // Connection start, or session commit/rollback failed:
251
// setting the message to null.
252
catch (JMSException JavaDoc commitExc) {
253       momMessage = null;
254     }
255     return momMessage;
256   }
257
258   /**
259    * Asynchronous method requesting a delivery from the foreign
260    * JMS destination.
261    *
262    * @exception javax.jms.IllegalStateException If the module state does
263    * not allow to request a message.
264    */

265   public void receive() throws IllegalStateException JavaDoc {
266     if (! usable)
267       throw new IllegalStateException JavaDoc(notUsableMessage);
268
269     consumerDaemon.receive();
270   }
271   
272   /**
273    * Sends a message to the foreign JMS destination.
274    *
275    * @exception javax.jms.IllegalStateException If the module's state does
276    * not permit message sendings.
277    * @exception javax.jms.MessageFormatException If the MOM message could not
278    * be converted into a foreign JMS message.
279    */

280   public void send(org.objectweb.joram.shared.messages.Message message)
281               throws JMSException JavaDoc {
282     if (! usable)
283       throw new IllegalStateException JavaDoc(notUsableMessage);
284
285     try {
286       producer.send(org.objectweb.joram.client.jms.Message.wrapMomMessage(null, message));
287       acknowledge(message);
288     } catch (javax.jms.JMSException JavaDoc exc) {
289       // Connection failure? Keeping the message for later delivery.
290
qout.add(message);
291     }
292   }
293
294   /**
295    * Interrupts the daemons and closes the connection.
296    */

297   public void close()
298   {
299     try {
300       cnx.stop();
301     }
302     catch (JMSException JavaDoc exc) {}
303
304     unsetMessageListener();
305
306     try {
307       consumerDaemon.interrupt();
308     }
309     catch (Exception JavaDoc exc) {}
310     try {
311       reconnectionDaemon.interrupt();
312     }
313     catch (Exception JavaDoc exc) {}
314
315     try {
316       cnx.close();
317     }
318     catch (JMSException JavaDoc exc) {}
319   }
320
321   /**
322    * Implements the <code>javax.jms.ExceptionListener</code> interface for
323    * catching the failures of the connection to the remote JMS server.
324    * <p>
325    * Reacts by launching a reconnection process.
326    */

327   public void onException(JMSException JavaDoc exc)
328   {
329     reconnectionDaemon.reconnect();
330   }
331
332   /**
333    * Implements the <code>javax.jms.MessageListener</code> interface for
334    * processing the asynchronous deliveries coming from the foreign JMS
335    * server.
336    */

337   public void onMessage(javax.jms.Message JavaDoc jmsMessage)
338   {
339     try {
340       try {
341         org.objectweb.joram.client.jms.Message clientMessage =
342           org.objectweb.joram.client.jms.Message.convertJMSMessage(jmsMessage);
343         Message momMessage = clientMessage.getMomMsg();
344         consumerSession.commit();
345         Channel.sendTo(agentId, new BridgeDeliveryNot(momMessage));
346       }
347       // Conversion error: denying the message.
348
catch (MessageFormatException JavaDoc conversionExc) {
349         consumerSession.rollback();
350       }
351     }
352     // Commit or rollback failed: nothing to do.
353
catch (JMSException JavaDoc exc) {}
354   }
355
356   /**
357    * Opens a connection with the foreign JMS server and creates the
358    * JMS resources for interacting with the foreign JMS destination.
359    *
360    * @exception JMSException If the needed JMS resources could not be created.
361    */

362   protected void doConnect() throws JMSException JavaDoc {
363     if (userName != null && password != null)
364       cnx = cnxFact.createConnection(userName, password);
365     else
366       cnx = cnxFact.createConnection();
367     cnx.setExceptionListener(this);
368
369     if (clientID != null)
370       cnx.setClientID(clientID);
371
372     producerSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
373     producer = producerSession.createProducer(dest);
374
375     consumerSession = cnx.createSession(true, 0);
376   }
377
378   /**
379    * Sets the JMS consumer on the foreign destination.
380    *
381    * @exception JMSException If the JMS consumer could not be created.
382    */

383   protected void setConsumer() throws JMSException JavaDoc {
384     if (consumer != null)
385       return;
386
387     try {
388       if (dest instanceof Queue JavaDoc)
389         consumer = consumerSession.createConsumer(dest, selector);
390       else
391         consumer = consumerSession.createDurableSubscriber((Topic JavaDoc) dest,
392                                                            agentId.toString(),
393                                                            selector,
394                                                            false);
395     }
396     catch (JMSException JavaDoc exc) {
397       throw exc;
398     }
399     catch (Exception JavaDoc exc) {
400       throw new JMSException JavaDoc("JMS resources do not allow to create consumer: "
401                              + exc);
402     }
403   }
404
405   /**
406    * Unsets the JMS consumer.
407    */

408   protected void unsetConsumer() {
409     try {
410       if (dest instanceof Topic JavaDoc)
411         consumerSession.unsubscribe(agentId.toString());
412
413       consumer.close();
414     }
415     catch (Exception JavaDoc exc) {}
416
417     consumer = null;
418   }
419
420   /**
421    * Acknowledges a message successfuly delivered to the foreign JMS server.
422    */

423   protected void acknowledge(Message message)
424   {
425     Channel.sendTo(agentId, new BridgeAckNot(message.id));
426   }
427
428
429   /**
430    * The <code>StartupDaemon</code> thread is responsible for retrieving
431    * the needed JMS administered objects from the JNDI server.
432    */

433   protected class StartupDaemon extends Daemon {
434     /** Constructs a <code>StartupDaemon</code> thread. */
435     protected StartupDaemon() {
436       super(agentId.toString() + ":StartupDaemon");
437       setDaemon(false);
438     }
439
440     /** The daemon's loop. */
441     public void run() {
442       javax.naming.Context JavaDoc jndiCtx = null;
443       try {
444         canStop = true;
445
446         // Administered objects still to be retrieved: getting them from
447
// JNDI.
448
if (cnxFact == null || dest == null) {
449           if (jndiFactory == null || jndiUrl == null)
450             jndiCtx = new javax.naming.InitialContext JavaDoc();
451           else {
452             java.util.Hashtable JavaDoc env = new java.util.Hashtable JavaDoc();
453             env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
454             env.put(javax.naming.Context.PROVIDER_URL, jndiUrl);
455             jndiCtx = new javax.naming.InitialContext JavaDoc(env);
456           }
457           cnxFact = (ConnectionFactory JavaDoc) jndiCtx.lookup(cnxFactName);
458           dest = (Destination JavaDoc) jndiCtx.lookup(destName);
459           
460           if (dest instanceof Topic JavaDoc)
461             automaticRequest = false;
462         }
463         try {
464           doConnect();
465         }
466         catch (AbstractMethodError JavaDoc exc) {
467           usable = false;
468           notUsableMessage = "Retrieved administered objects types not "
469                            + "compatible with the 'unified' communication "
470                            + " mode: " + exc;
471         }
472         catch (ClassCastException JavaDoc exc) {
473           usable = false;
474           notUsableMessage = "Retrieved administered objects types not "
475                            + "compatible with the chosen communication mode: "
476                            + exc;
477         }
478         catch (JMSSecurityException JavaDoc exc) {
479           usable = false;
480           notUsableMessage = "Provided user identification does not allow "
481                            + "to connect to the foreign JMS server: "
482                            + exc;
483         }
484         catch (JMSException JavaDoc exc) {
485           reconnectionDaemon.reconnect();
486         }
487         catch (Throwable JavaDoc exc) {
488           usable = false;
489           notUsableMessage = "" + exc;
490         }
491       }
492       catch (javax.naming.NameNotFoundException JavaDoc exc) {
493         usable = false;
494         if (cnxFact == null)
495           notUsableMessage = "Could not retrieve ConnectionFactory ["
496                              + cnxFactName
497                              + "] from JNDI: " + exc;
498         else if (dest == null)
499           notUsableMessage = "Could not retrieve Destination ["
500                              + destName
501                              + "] from JNDI: " + exc;
502       }
503       catch (javax.naming.NamingException JavaDoc exc) {
504         usable = false;
505         notUsableMessage = "Could not access JNDI: " + exc;
506       }
507       catch (ClassCastException JavaDoc exc) {
508         usable = false;
509         notUsableMessage = "Error while retrieving administered objects "
510                            + "through JNDI possibly because of missing "
511                            + "foreign JMS client libraries in classpath: "
512                            + exc;
513       }
514       catch (Exception JavaDoc exc) {
515         usable = false;
516         notUsableMessage = "Error while retrieving administered objects "
517                            + "through JNDI: "
518                            + exc;
519       }
520       finally {
521         // Closing the JNDI context.
522
try {
523           jndiCtx.close();
524         }
525         catch (Exception JavaDoc exc) {}
526
527         finish();
528       }
529     }
530
531     /** Shuts the daemon down. */
532     public void shutdown()
533     {}
534
535     /** Releases the daemon's resources. */
536     public void close()
537     {}
538   }
539
540   /**
541    * The <code>ReconnectionDaemon</code> thread is responsible for reconnecting
542    * the bridge module with the foreign JMS server in case of disconnection.
543    */

544   protected class ReconnectionDaemon extends Daemon
545   {
546     /** Number of reconnection trials of the first step. */
547     private int attempts1 = 30;
548     /** Retry interval (in milliseconds) of the first step. */
549     private long interval1 = 1000L;
550     /** Number of reconnection trials of the second step. */
551     private int attempts2 = 55;
552     /** Retry interval (in milliseconds) of the second step. */
553     private long interval2 = 5000L;
554     /** Retry interval (in milliseconds) of the third step. */
555     private long interval3 = 60000L;
556
557     /** Constructs a <code>ReconnectionDaemon</code> thread. */
558     protected ReconnectionDaemon()
559     {
560       super(agentId.toString() + ":ReconnectionDaemon");
561       setDaemon(false);
562     }
563
564     /** Notifies the daemon to start reconnecting. */
565     protected void reconnect() {
566       if (running)
567         return;
568
569       consumer = null;
570       start();
571     }
572
573     /** The daemon's loop. */
574     public void run()
575     {
576       int attempts = 0;
577       long interval;
578       Message msg;
579
580       try {
581         while (running) {
582           canStop = true;
583
584           attempts++;
585
586           if (attempts <= 30)
587             interval = interval1;
588           else if (attempts <= 55)
589             interval = interval2;
590           else
591             interval = interval3;
592
593           try {
594             Thread.sleep(interval);
595             doConnect();
596             
597             // Setting the listener, if any.
598
if (listener)
599               setMessageListener();
600             // Starting the consumer daemon:
601
consumerDaemon.start();
602             // Sending the pending messages, if any:
603
while (! qout.isEmpty())
604               send((Message) qout.remove(0));
605           }
606           catch (Exception JavaDoc exc) {
607             continue;
608           }
609           canStop = false;
610           break;
611         }
612       }
613       finally {
614         finish();
615       }
616     }
617
618     /** Shuts the daemon down. */
619     public void shutdown()
620     {}
621
622     /** Releases the daemon's resources. */
623     public void close()
624     {}
625   }
626
627   /**
628    * The <code>ConsumerDaemon</code> thread allows to call
629    * <code>MessageConsumer.receive()</code> for requesting a foreign JMS
630    * message without blocking the JORAM server.
631    */

632   protected class ConsumerDaemon extends Daemon {
633     /** Counter of pending "receive" requests. */
634     private int requests = 0;
635
636
637     /** Constructs a <code>ReceiverDaemon</code> thread. */
638     protected ConsumerDaemon() {
639       super(agentId.toString() + ":ConsumerDaemon");
640       setDaemon(false);
641     }
642
643     /** Notifies the daemon of a new "receive" request. */
644     protected synchronized void receive() {
645       requests++;
646
647       if (running)
648         return;
649
650       start();
651     }
652
653     /** The daemon's loop. */
654     public void run() {
655       try {
656         Message momMessage;
657         BridgeDeliveryNot notif;
658
659         setConsumer();
660         cnx.start();
661         while ((requests > 0 || automaticRequest) && running) {
662           canStop = true;
663           // Expecting a message:
664
try {
665             org.objectweb.joram.client.jms.Message clientMessage =
666               org.objectweb.joram.client.jms.Message.convertJMSMessage(consumer.receive());
667             momMessage = clientMessage.getMomMsg();
668             consumerSession.commit();
669           }
670           // Conversion error: denying the message.
671
catch (MessageFormatException JavaDoc messageExc) {
672             consumerSession.rollback();
673             continue;
674           }
675           // Processing the delivery.
676
canStop = false;
677           notif = new BridgeDeliveryNot(momMessage);
678           Channel.sendTo(agentId, notif);
679           if (!automaticRequest)
680             requests--;
681         }
682       }
683       // Connection loss?
684
catch (JMSException JavaDoc exc) {}
685       finally {
686         finish();
687       }
688     }
689
690     /** Shuts the daemon down. */
691     public void shutdown()
692     {}
693
694     /** Releases the daemon's resources. */
695     public void close()
696     {}
697   }
698
699
700   /** Deserializes a <code>BridgeUnifiedModule</code> instance. */
701   private void readObject(java.io.ObjectInputStream JavaDoc in)
702                throws java.io.IOException JavaDoc, ClassNotFoundException JavaDoc
703   {
704     in.defaultReadObject();
705     qout = new Vector JavaDoc();
706   }
707 }
708
Popular Tags