KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > JMS


1 // $Id: JMS.java,v 1.11 2005/04/15 13:17:02 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Message;
8 import org.jgroups.View;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.Util;
11
12 import javax.naming.Context JavaDoc;
13 import javax.naming.InitialContext JavaDoc;
14 import java.io.*;
15 import java.util.Hashtable JavaDoc;
16 import java.util.Properties JavaDoc;
17 import java.util.Vector JavaDoc;
18
19 /**
20  * Implementation of the transport protocol using the Java Message Service (JMS).
21  * This implementation depends on the JMS server that will distribute messages
22  * published to the specific topic to all topic subscribers.
23  * <p>
24  * Protocol parameters are:
25  * <ul>
26  * <li><code>topicName</code> - (required), full JNDI name of the topic to be
27  * used for message publishing;
28  *
29  * <li><code>cf</code> - (optional), full JNDI name of the topic connection
30  * factory that will create topic connection, default value is
31  * <code>"ConnectionFactory"</code>;
32  *
33  * <li><code>jndiCtx</code> - (optional), value of the
34  * <code>javax.naming.Context.INITIAL_CONTEXT_FACTORY</code> property; you can
35  * specify it as the JVM system property
36  * <code>-Djava.naming.factory.initial=factory.class.Name</code>;
37  *
38  * <li><code>providerURL</code> - (optional), value of the
39  * <code>javax.naming.Context.PROVIDER_URL</code> property; you can specify it
40  * as the JVM system property <code>-Djava.naming.provider.url=some_url</code>
41  *
42  * <li><code>ttl</code> - (required), time to live in milliseconds. Default
43  * value is 0, that means that messages will never expire and will be
44  * accumulated by a JMS server.
45  *
46  * </ul>
47  *
48  * <p>
49  * Note, when you are using the JMS protocol, try to avoid using protocols
50  * that open server socket connections, like FD_SOCK. I belive that FD is more
51  * appropriate failure detector for JMS case.
52  *
53  * @author Roman Rokytskyy (rrokytskyy@acm.org)
54  */

55 public class JMS extends Protocol implements javax.jms.MessageListener JavaDoc {
56
57     public static final
58         String JavaDoc DEFAULT_CONNECTION_FACTORY = "ConnectionFactory";
59
60     public static final
61         String JavaDoc INIT_CONNECTION_FACTORY = "cf";
62
63     public static final
64         String JavaDoc INIT_TOPIC_NAME = "topicName";
65
66     public static final
67         String JavaDoc INIT_JNDI_CONTEXT = "jndiCtx";
68
69     public static final
70         String JavaDoc INIT_PROVIDER_URL = "providerURL";
71         
72     public static final
73         String JavaDoc TIME_TO_LIVE = "ttl";
74
75     public static final
76         String JavaDoc GROUP_NAME_PROPERTY = "jgroups_group_name";
77
78     public static final
79         String JavaDoc SRC_PROPERTY = "src";
80
81     public static final
82         String JavaDoc DEST_PROPERTY = "dest";
83
84     private final Vector JavaDoc members = new Vector JavaDoc();
85
86     private javax.jms.TopicConnectionFactory JavaDoc connectionFactory;
87     private javax.jms.Topic JavaDoc topic;
88
89     private javax.jms.TopicConnection JavaDoc connection;
90
91     private javax.jms.TopicSession JavaDoc session;
92     private javax.jms.TopicPublisher JavaDoc publisher;
93     private javax.jms.TopicSubscriber JavaDoc subscriber;
94
95     private String JavaDoc cfName;
96     private String JavaDoc topicName;
97     private String JavaDoc initCtxFactory;
98     private String JavaDoc providerUrl;
99     private long timeToLive;
100
101     private Context JavaDoc ctx;
102
103     private String JavaDoc group_addr;
104     private Address local_addr;
105     private Address mcast_addr;
106
107     private final ByteArrayOutputStream out_stream = new ByteArrayOutputStream(65535);
108     
109     private static final java.util.Random JavaDoc RND = new java.util.Random JavaDoc();
110
111     /**
112      * Empty constructor.
113      */

114     public JMS() {
115     }
116
117     /**
118      * Get the name of the protocol.
119      *
120      * @return always returns the <code>"JMS"</code> string.
121      */

122     public String JavaDoc getName() {
123         return "JMS";
124     }
125
126     /**
127      * Get the string representation of the protocol.
128      *
129      * @return string representation of the protocol (not very useful though).
130      */

131     public String JavaDoc toString() {
132         return "Protocol JMS(local address: " + local_addr + ')';
133     }
134
135     /**
136      * Set protocol properties. Properties are:
137      * <ul>
138      * <li><code>topicName</code> - (required), full JNDI name of the topic to be
139      * used for message publishing;
140      *
141      * <li><code>cf</code> - (optional), full JNDI name of the topic connection
142      * factory that will create topic connection, default value is
143      * <code>"ConnectionFactory"</code>;
144      *
145      * <li><code>jndiCtx</code> - (optional), value of the
146      * <code>javax.naming.Context.INITIAL_CONTEXT_FACTORY</code> property; you can
147      * specify it as the JVM system property
148      * <code>-Djava.naming.factory.initial=factory.class.Name</code>;
149      *
150      * <li><code>providerURL</code> - (optional), value of the
151      * <code>javax.naming.Context.PROVIDER_URL</code> property; you can specify it
152      * as the JVM system property <code>-Djava.naming.provider.url=some_url</code>
153      * </ul>
154      *
155      */

156     public boolean setProperties(Properties JavaDoc props) {
157         super.setProperties(props);
158         cfName = props.getProperty(INIT_CONNECTION_FACTORY,
159                 DEFAULT_CONNECTION_FACTORY);
160
161         props.remove(INIT_CONNECTION_FACTORY);
162
163         topicName = props.getProperty(INIT_TOPIC_NAME);
164
165         if (topicName == null)
166                 throw new IllegalArgumentException JavaDoc(
167                 "JMS topic has not been specified.");
168
169         props.remove(INIT_TOPIC_NAME);
170
171         initCtxFactory = props.getProperty(INIT_JNDI_CONTEXT);
172         props.remove(INIT_JNDI_CONTEXT);
173
174         providerUrl = props.getProperty(INIT_PROVIDER_URL);
175         props.remove(INIT_PROVIDER_URL);
176         
177         String JavaDoc ttl = props.getProperty(TIME_TO_LIVE);
178         
179         if (ttl == null) {
180             if(log.isErrorEnabled()) log.error("ttl property not found.");
181             return false;
182         }
183         
184         props.remove(TIME_TO_LIVE);
185         
186         // try to parse ttl property
187
try {
188             
189             timeToLive = Long.parseLong(ttl);
190             
191         } catch(NumberFormatException JavaDoc nfex) {
192             if(log.isErrorEnabled()) log.error("ttl property does not contain numeric value.");
193             
194             return false;
195         }
196
197         return props.size() == 0;
198     }
199
200
201
202
203     /**
204      * Implementation of the <code>javax.jms.MessageListener</code> interface.
205      * This method receives the JMS message, checks the destination group name.
206      * If the group name is the same as the group name of this channel, it
207      * checks the destination address. If destination address is either
208      * multicast or is the same as local address then message is unwrapped and
209      * passed up the protocol stack. Otherwise it is ignored.
210      *
211      * @param jmsMessage instance of <code>javax.jms.Message</code>.
212      */

213     public void onMessage(javax.jms.Message JavaDoc jmsMessage) {
214         try {
215             String JavaDoc groupName = jmsMessage.getStringProperty(GROUP_NAME_PROPERTY);
216
217             // there might be other messages in this topic
218
if (groupName == null)
219                 return;
220             
221
222                 if(log.isDebugEnabled()) log.debug("Got message for group [" +
223                 groupName + ']' + ", my group is [" + group_addr + ']');
224
225             // not our message, ignore it
226
if (!group_addr.equals(groupName))
227                 return;
228
229             JMSAddress src =
230                 jmsMessage.getStringProperty(SRC_PROPERTY) != null ?
231                     new JMSAddress(jmsMessage.getStringProperty(SRC_PROPERTY)) :
232                     null;
233
234             JMSAddress dest =
235                 jmsMessage.getStringProperty(DEST_PROPERTY) != null ?
236                     new JMSAddress(jmsMessage.getStringProperty(DEST_PROPERTY)) :
237                     null;
238
239             // if message is unicast message and I'm not the destination - ignore
240
if (src != null && dest != null &&
241                 !dest.equals(local_addr) && !dest.isMulticastAddress())
242                 return;
243             
244
245             if (jmsMessage instanceof javax.jms.ObjectMessage JavaDoc) {
246                 byte[] buf = (byte[])((javax.jms.ObjectMessage JavaDoc)jmsMessage).getObject();
247
248                 ByteArrayInputStream inp_stream=new ByteArrayInputStream(buf);
249                 ObjectInputStream inp=new ObjectInputStream(inp_stream);
250                     
251                 Message msg=new Message();
252                 msg.readExternal(inp);
253                     
254                 Event evt=new Event(Event.MSG, msg);
255
256                  // +++ remove
257
if(log.isDebugEnabled()) log.debug("Message is " + msg +
258                         ", headers are " + msg.getHeaders ());
259
260                 /* Because Protocol.Up() is never called by this bottommost layer,
261                  * we call Up() directly in the observer. This allows e.g.
262                  * PerfObserver to get the time of reception of a message */

263                 if(observer != null)
264                     observer.up(evt, up_queue.size());
265
266                 passUp(evt);
267             }
268         } catch(javax.jms.JMSException JavaDoc ex) {
269             ex.printStackTrace();
270             if(log.isErrorEnabled()) log.error("JMSException : " + ex.toString());
271         } catch(IOException ioex) {
272             ioex.printStackTrace();
273             if(log.isErrorEnabled()) log.error("IOException : " + ioex.toString());
274         } catch(ClassNotFoundException JavaDoc cnfex) {
275                 cnfex.printStackTrace();
276                 if(log.isErrorEnabled()) log.error("ClassNotFoundException : " + cnfex.toString());
277         }
278     }
279
280     /**
281      * Handle down event, if it is not a Event.MSG type.
282      *
283      * @param evt event to handle.
284      */

285     protected void handleDownEvent(Event evt) {
286         switch(evt.getType()) {
287
288             // we do not need this at present time, maybe in the future
289
case Event.TMP_VIEW:
290             case Event.VIEW_CHANGE:
291                 synchronized(members) {
292                         members.removeAllElements();
293                         Vector JavaDoc tmpvec=((View)evt.getArg()).getMembers();
294                         for(int i=0; i < tmpvec.size(); i++)
295                         members.addElement(tmpvec.elementAt(i));
296                 }
297                 break;
298
299             case Event.GET_LOCAL_ADDRESS:
300                 // return local address -> Event(SET_LOCAL_ADDRESS, local)
301
passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
302                 break;
303
304             case Event.CONNECT:
305                 group_addr=(String JavaDoc)evt.getArg();
306                 passUp(new Event(Event.CONNECT_OK));
307                 break;
308
309             case Event.DISCONNECT:
310                 passUp(new Event(Event.DISCONNECT_OK));
311                 break;
312         }
313     }
314
315     /**
316      * Called by the protocol above this. We check the event type, and if it is
317      * message, we publish it in the topic, otherwise we let the
318      * {@link #handleDownEvent(Event)} take care of it.
319      *
320      * @param evt event to process.
321      */

322     public void down(Event evt) {
323
324             if(log.isInfoEnabled()) log.info("event is " + evt + ", group_addr=" +
325                 group_addr + ", time=" + System.currentTimeMillis() +
326                 ", hdrs are " + Util.printEvent(evt));
327
328         // handle all non-message events
329
if(evt.getType() != Event.MSG) {
330                 handleDownEvent(evt);
331                 return;
332         }
333
334         // extract message
335
Message msg=(Message)evt.getArg();
336
337         // Because we don't call Protocol.passDown(), we notify the observer
338
// directly (e.g. PerfObserver).
339
// This way, we still have performance numbers for UDP
340
if(observer != null)
341                 observer.passDown(evt);
342
343         // publish the message to the topic
344
sendMessage(msg);
345
346     }
347
348
349
350     /**
351      * Publish message in the JMS topic. We set the message source and
352      * destination addresses if they were <code>null</code>.
353      *
354      * @param msg message to publish.
355      */

356     protected void sendMessage(Message msg) {
357         try {
358             if (msg.getSrc() == null)
359                     msg.setSrc(local_addr);
360
361             if (msg.getDest() == null)
362                     msg.setDest(mcast_addr);
363
364
365                     if(log.isInfoEnabled()) log.info("msg is " + msg);
366
367             // convert the message into byte array.
368
out_stream.reset();
369
370             ObjectOutputStream out=new ObjectOutputStream(out_stream);
371             msg.writeExternal(out);
372             out.flush();
373             
374             byte[] buf = out_stream.toByteArray();
375
376             javax.jms.ObjectMessage JavaDoc jmsMessage = session.createObjectMessage();
377             
378             // set the payload
379
jmsMessage.setObject(buf);
380             
381             // set the group name
382
jmsMessage.setStringProperty(GROUP_NAME_PROPERTY, group_addr);
383
384             // if the source is JMSAddress, copy it to the header
385
if (msg.getSrc() instanceof JMSAddress)
386                     jmsMessage.setStringProperty(
387                             SRC_PROPERTY, msg.getSrc().toString());
388
389             // if the destination is JMSAddress, copy it to the header
390
if (msg.getDest() instanceof JMSAddress)
391                     jmsMessage.setStringProperty(
392                             DEST_PROPERTY, msg.getDest().toString());
393
394             // publish message
395
publisher.publish(jmsMessage);
396                 
397         } catch(javax.jms.JMSException JavaDoc ex) {
398                 if(log.isErrorEnabled()) log.error("JMSException : " + ex.toString());
399         } catch(IOException ioex) {
400                 if(log.isErrorEnabled()) log.error("IOException : " + ioex.toString());
401         }
402     }
403
404     /**
405      * Start the JMS protocol. This method instantiates the JNDI initial context
406      * and looks up the topic connection factory and topic itself. If this step
407      * is successful, it creates a connection to JMS server, opens a session
408      * and obtains publisher and subscriber instances.
409      *
410      * @throws javax.jms.JMSException if something goes wrong with JMS.
411      * @throws javax.naming.NamingException if something goes wrong with JNDI.
412      * @throws IllegalArgumentException if the connection factory or topic
413      * cannot be found under specified names.
414      */

415     public void start() throws Exception JavaDoc
416     {
417         if (initCtxFactory != null && providerUrl != null) {
418             Hashtable JavaDoc env = new Hashtable JavaDoc();
419             env.put(Context.INITIAL_CONTEXT_FACTORY, initCtxFactory);
420             env.put(Context.PROVIDER_URL, providerUrl);
421
422             ctx = new InitialContext JavaDoc(env);
423         } else
424             ctx = new InitialContext JavaDoc();
425
426         connectionFactory = (javax.jms.TopicConnectionFactory JavaDoc)ctx.lookup(cfName);
427         
428         if (connectionFactory == null)
429             throw new IllegalArgumentException JavaDoc(
430                     "Topic connection factory cannot be found in JNDI.");
431         
432         topic = (javax.jms.Topic JavaDoc)ctx.lookup(topicName);
433         
434         if (topic == null)
435             throw new IllegalArgumentException JavaDoc("Topic cannot be found in JNDI.");
436
437         connection = connectionFactory.createTopicConnection();
438
439         boolean addressAssigned = false;
440         
441         // check if JMS connection contains client ID,
442
// if not, try to assign randomly generated one
443
/*while(!addressAssigned) {
444             if (connection.getClientID() != null)
445                 addressAssigned = true;
446             else
447                 try {
448                     connection.setClientID(generateLocalAddress());
449                     addressAssigned = true;
450                 } catch(javax.jms.InvalidClientIDException ex) {
451                     // duplicate... ok, let's try again
452                 }
453         }*/

454
455
456         // Patch below submitted by Greg Woolsey
457
// Check if JMS connection contains client ID, if not, try to assign randomly generated one
458
// setClientID() must be the first method called on a new connection, per the JMS spec.
459
// If the client ID is already set, this will throw IllegalStateException and keep the original value.
460
while(!addressAssigned) {
461             try {
462                 connection.setClientID(generateLocalAddress());
463                 addressAssigned = true;
464             } catch (javax.jms.IllegalStateException JavaDoc e) {
465                 // expected if connection already has a client ID.
466
addressAssigned = true;
467             } catch(javax.jms.InvalidClientIDException JavaDoc ex) {
468                 // duplicate... OK, let's try again
469
}
470         }
471
472         local_addr = new JMSAddress(connection.getClientID(), false);
473         mcast_addr = new JMSAddress(topicName, true);
474
475         session = connection.createTopicSession(false,
476                                                 javax.jms.Session.AUTO_ACKNOWLEDGE);
477
478         publisher = session.createPublisher(topic);
479         publisher.setTimeToLive(timeToLive);
480         
481         subscriber = session.createSubscriber(topic);
482         subscriber.setMessageListener(this);
483         
484         connection.start();
485
486         passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
487     }
488
489     /**
490      * Stops the work of the JMS protocol. This method closes JMS session and
491      * connection and deregisters itself from the message notification.
492      */

493     public void stop() {
494
495             if(log.isInfoEnabled()) log.info("finishing JMS transport layer.");
496
497         try {
498             connection.stop();
499             subscriber.setMessageListener(null);
500             session.close();
501             connection.close();
502         }
503         catch(Throwable JavaDoc ex) {
504             if(log.isErrorEnabled()) log.error("exception is " + ex);
505         }
506     }
507     
508     /**
509      * Generate random local address. This method takes host name and appends
510      * it with randomly generated integer.
511      *
512      * @return randomly generated local address.
513      */

514     protected String JavaDoc generateLocalAddress() throws java.net.UnknownHostException JavaDoc {
515         String JavaDoc hostName = java.net.InetAddress.getLocalHost().getHostName();
516        
517         int rndPort = RND.nextInt(65535);
518         
519         return hostName + ':' + rndPort;
520     }
521
522     /**
523      * Simple {@link Address} representing the JMS node ID or JMS topic group.
524      */

525     protected static class JMSAddress implements Address {
526         private String JavaDoc address;
527         private boolean isMCast;
528
529         /**
530          * Create instance of this class for given address string.
531          *
532          * Current implementation uses a hash mark <code>'#'</code> to determine
533          * if the address is a unicast or multicast. Therefore, this character is
534          * considered as reserved and is not allowed in the <code>address</code>
535          * parameter passed to the {@link #JMSAddress(String, boolean)}
536          * constructor.
537          *
538          * @param address string representing the address of the node connected
539          * to the JMS topic, usually, a value of
540          * <code>connection.getClientID()</code>, where the connection is
541          * instance of <code>javax.jms.TopicConnection</code>.
542          *
543          * @param isMCast <code>true</code> if the address is multicast address,
544          * otherwise - <code>false</code>.
545          */

546         public JMSAddress(String JavaDoc address, boolean isMCast) {
547             this.address = address;
548             this.isMCast = isMCast;
549         }
550
551         /**
552          * Empty constructor to allow externalization work.
553          */

554         public JMSAddress() {
555         }
556
557         /**
558          * Reconstruct the address from the string representation. If the
559          * <code>str</code> starts with <code>'#'</code>, address is considered
560          * as unicast, and node address is the substring after <code>'#'</code>.
561          * Otherwise, address is multicast and address is <code>str</code>
562          * itself.
563          *
564          * @param str string used to reconstruct the instance.
565          */

566         public JMSAddress(String JavaDoc str) {
567             if (str.startsWith("#")) {
568                 address = str.substring(1);
569                 isMCast = false;
570             } else {
571                 address = str;
572                 isMCast = true;
573             }
574         }
575
576         /**
577          * Get the node address.
578          *
579          * @return node address in the form passed to the constructor
580          * {@link #JMSAddress(String, boolean)}.
581          */

582         public String JavaDoc getAddress() { return address; }
583
584         /**
585          * Set the node address.
586          *
587          * @param address new node address.
588          */

589         public void setAddress(String JavaDoc address) { this.address = address; }
590
591         /**
592          * Is the address a multicast address?
593          *
594          * @return <code>true</code> if the address is multicast address.
595          */

596         public boolean isMulticastAddress() {
597             return isMCast;
598         }
599
600         public int size() {
601             return 22;
602         }
603
604         /**
605          * Clone the object.
606          */

607         protected Object JavaDoc clone() throws CloneNotSupportedException JavaDoc {
608             return new JMSAddress(address, isMCast);
609         }
610
611         /**
612          * Compare this object to <code>o</code>. It is possible to compare only
613          * addresses of the same class. Also they both should be either
614          * multicast or unicast addresses.
615          *
616          * @return value compliant with the {@link Comparable#compareTo(Object)}
617          * specififaction.
618          */

619         public int compareTo(Object JavaDoc o) throws ClassCastException JavaDoc {
620             if (!(o instanceof JMSAddress))
621                 throw new ClassCastException JavaDoc("Cannot compare different classes.");
622
623             JMSAddress that = (JMSAddress)o;
624
625             if (that.isMCast != this.isMCast)
626                 throw new ClassCastException JavaDoc(
627                     "Addresses are different: one is multicast, and one is not");
628
629             return this.address.compareTo(that.address);
630         }
631
632         /**
633          * Test is this object is equal to <code>obj</code>.
634          *
635          * @return <code>true</code> iff the <code>obj</code> is
636          * <code>JMSAddress</code>, node addresses are equal and they both are
637          * either multicast or unicast addresses.
638          */

639         public boolean equals(Object JavaDoc obj) {
640             if (obj == this) return true;
641
642             if (!(obj instanceof JMSAddress))
643                     return false;
644
645             JMSAddress that = (JMSAddress)obj;
646
647             if (this.isMCast)
648                 return this.isMCast == that.isMCast;
649             else
650             if (this.address == null || that.address == null)
651                 return false;
652             else
653                 return this.address.equals(that.address) &&
654                     this.isMCast == that.isMCast;
655         }
656
657         /**
658          * Get the hash code of this address.
659          *
660          * @return hash code of this object.
661          */

662         public int hashCode() {
663             return toString().hashCode();
664         }
665
666         /**
667          * Read object from external input.
668          */

669         public void readExternal(ObjectInput in)
670             throws IOException, ClassNotFoundException JavaDoc
671         {
672             address = (String JavaDoc)in.readObject();
673             isMCast = in.readBoolean();
674         }
675
676         /**
677          * Get the string representation of the address. The following property
678          * holds: <code>a2.equals(a1)</code> is always <code>true</code>, where
679          * <code>a2</code> is
680          * <code>JMSAddress a2 = new JMSAddress(a1.toString());</code>
681          *
682          * @return string representation of the address.
683          */

684         public String JavaDoc toString() {
685             return !isMCast ? '#' + address : address;
686         }
687
688         /**
689          * Write the object to external output.
690          */

691         public void writeExternal(ObjectOutput out) throws IOException {
692             out.writeObject(address);
693             out.writeBoolean(isMCast);
694         }
695
696
697         public void writeTo(DataOutputStream outstream) throws IOException {
698             outstream.writeUTF(address);
699             outstream.writeBoolean(isMCast);
700         }
701
702         public void readFrom(DataInputStream instream) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
703             address=instream.readUTF();
704             isMCast=instream.readBoolean();
705         }
706     }
707
708 }
709
Popular Tags