1 3 package org.jgroups.protocols; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Message; 8 import org.jgroups.View; 9 import org.jgroups.stack.Protocol; 10 import org.jgroups.util.Util; 11 12 import javax.naming.Context ; 13 import javax.naming.InitialContext ; 14 import java.io.*; 15 import java.util.Hashtable ; 16 import java.util.Properties ; 17 import java.util.Vector ; 18 19 55 public class JMS extends Protocol implements javax.jms.MessageListener { 56 57 public static final 58 String DEFAULT_CONNECTION_FACTORY = "ConnectionFactory"; 59 60 public static final 61 String INIT_CONNECTION_FACTORY = "cf"; 62 63 public static final 64 String INIT_TOPIC_NAME = "topicName"; 65 66 public static final 67 String INIT_JNDI_CONTEXT = "jndiCtx"; 68 69 public static final 70 String INIT_PROVIDER_URL = "providerURL"; 71 72 public static final 73 String TIME_TO_LIVE = "ttl"; 74 75 public static final 76 String GROUP_NAME_PROPERTY = "jgroups_group_name"; 77 78 public static final 79 String SRC_PROPERTY = "src"; 80 81 public static final 82 String DEST_PROPERTY = "dest"; 83 84 private final Vector members = new Vector (); 85 86 private javax.jms.TopicConnectionFactory connectionFactory; 87 private javax.jms.Topic topic; 88 89 private javax.jms.TopicConnection connection; 90 91 private javax.jms.TopicSession session; 92 private javax.jms.TopicPublisher publisher; 93 private javax.jms.TopicSubscriber subscriber; 94 95 private String cfName; 96 private String topicName; 97 private String initCtxFactory; 98 private String providerUrl; 99 private long timeToLive; 100 101 private Context ctx; 102 103 private String group_addr; 104 private Address local_addr; 105 private Address mcast_addr; 106 107 private final ByteArrayOutputStream out_stream = new ByteArrayOutputStream(65535); 108 109 private static final java.util.Random RND = new java.util.Random (); 110 111 114 public JMS() { 115 } 116 117 122 public String getName() { 123 return "JMS"; 124 } 125 126 131 public String toString() { 132 return "Protocol JMS(local address: " + local_addr + ')'; 133 } 134 135 156 public boolean setProperties(Properties props) { 157 super.setProperties(props); 158 cfName = props.getProperty(INIT_CONNECTION_FACTORY, 159 DEFAULT_CONNECTION_FACTORY); 160 161 props.remove(INIT_CONNECTION_FACTORY); 162 163 topicName = props.getProperty(INIT_TOPIC_NAME); 164 165 if (topicName == null) 166 throw new IllegalArgumentException ( 167 "JMS topic has not been specified."); 168 169 props.remove(INIT_TOPIC_NAME); 170 171 initCtxFactory = props.getProperty(INIT_JNDI_CONTEXT); 172 props.remove(INIT_JNDI_CONTEXT); 173 174 providerUrl = props.getProperty(INIT_PROVIDER_URL); 175 props.remove(INIT_PROVIDER_URL); 176 177 String ttl = props.getProperty(TIME_TO_LIVE); 178 179 if (ttl == null) { 180 if(log.isErrorEnabled()) log.error("ttl property not found."); 181 return false; 182 } 183 184 props.remove(TIME_TO_LIVE); 185 186 try { 188 189 timeToLive = Long.parseLong(ttl); 190 191 } catch(NumberFormatException nfex) { 192 if(log.isErrorEnabled()) log.error("ttl property does not contain numeric value."); 193 194 return false; 195 } 196 197 return props.size() == 0; 198 } 199 200 201 202 203 213 public void onMessage(javax.jms.Message jmsMessage) { 214 try { 215 String groupName = jmsMessage.getStringProperty(GROUP_NAME_PROPERTY); 216 217 if (groupName == null) 219 return; 220 221 222 if(log.isDebugEnabled()) log.debug("Got message for group [" + 223 groupName + ']' + ", my group is [" + group_addr + ']'); 224 225 if (!group_addr.equals(groupName)) 227 return; 228 229 JMSAddress src = 230 jmsMessage.getStringProperty(SRC_PROPERTY) != null ? 231 new JMSAddress(jmsMessage.getStringProperty(SRC_PROPERTY)) : 232 null; 233 234 JMSAddress dest = 235 jmsMessage.getStringProperty(DEST_PROPERTY) != null ? 236 new JMSAddress(jmsMessage.getStringProperty(DEST_PROPERTY)) : 237 null; 238 239 if (src != null && dest != null && 241 !dest.equals(local_addr) && !dest.isMulticastAddress()) 242 return; 243 244 245 if (jmsMessage instanceof javax.jms.ObjectMessage ) { 246 byte[] buf = (byte[])((javax.jms.ObjectMessage )jmsMessage).getObject(); 247 248 ByteArrayInputStream inp_stream=new ByteArrayInputStream(buf); 249 ObjectInputStream inp=new ObjectInputStream(inp_stream); 250 251 Message msg=new Message(); 252 msg.readExternal(inp); 253 254 Event evt=new Event(Event.MSG, msg); 255 256 if(log.isDebugEnabled()) log.debug("Message is " + msg + 258 ", headers are " + msg.getHeaders ()); 259 260 263 if(observer != null) 264 observer.up(evt, up_queue.size()); 265 266 passUp(evt); 267 } 268 } catch(javax.jms.JMSException ex) { 269 ex.printStackTrace(); 270 if(log.isErrorEnabled()) log.error("JMSException : " + ex.toString()); 271 } catch(IOException ioex) { 272 ioex.printStackTrace(); 273 if(log.isErrorEnabled()) log.error("IOException : " + ioex.toString()); 274 } catch(ClassNotFoundException cnfex) { 275 cnfex.printStackTrace(); 276 if(log.isErrorEnabled()) log.error("ClassNotFoundException : " + cnfex.toString()); 277 } 278 } 279 280 285 protected void handleDownEvent(Event evt) { 286 switch(evt.getType()) { 287 288 case Event.TMP_VIEW: 290 case Event.VIEW_CHANGE: 291 synchronized(members) { 292 members.removeAllElements(); 293 Vector tmpvec=((View)evt.getArg()).getMembers(); 294 for(int i=0; i < tmpvec.size(); i++) 295 members.addElement(tmpvec.elementAt(i)); 296 } 297 break; 298 299 case Event.GET_LOCAL_ADDRESS: 300 passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 302 break; 303 304 case Event.CONNECT: 305 group_addr=(String )evt.getArg(); 306 passUp(new Event(Event.CONNECT_OK)); 307 break; 308 309 case Event.DISCONNECT: 310 passUp(new Event(Event.DISCONNECT_OK)); 311 break; 312 } 313 } 314 315 322 public void down(Event evt) { 323 324 if(log.isInfoEnabled()) log.info("event is " + evt + ", group_addr=" + 325 group_addr + ", time=" + System.currentTimeMillis() + 326 ", hdrs are " + Util.printEvent(evt)); 327 328 if(evt.getType() != Event.MSG) { 330 handleDownEvent(evt); 331 return; 332 } 333 334 Message msg=(Message)evt.getArg(); 336 337 if(observer != null) 341 observer.passDown(evt); 342 343 sendMessage(msg); 345 346 } 347 348 349 350 356 protected void sendMessage(Message msg) { 357 try { 358 if (msg.getSrc() == null) 359 msg.setSrc(local_addr); 360 361 if (msg.getDest() == null) 362 msg.setDest(mcast_addr); 363 364 365 if(log.isInfoEnabled()) log.info("msg is " + msg); 366 367 out_stream.reset(); 369 370 ObjectOutputStream out=new ObjectOutputStream(out_stream); 371 msg.writeExternal(out); 372 out.flush(); 373 374 byte[] buf = out_stream.toByteArray(); 375 376 javax.jms.ObjectMessage jmsMessage = session.createObjectMessage(); 377 378 jmsMessage.setObject(buf); 380 381 jmsMessage.setStringProperty(GROUP_NAME_PROPERTY, group_addr); 383 384 if (msg.getSrc() instanceof JMSAddress) 386 jmsMessage.setStringProperty( 387 SRC_PROPERTY, msg.getSrc().toString()); 388 389 if (msg.getDest() instanceof JMSAddress) 391 jmsMessage.setStringProperty( 392 DEST_PROPERTY, msg.getDest().toString()); 393 394 publisher.publish(jmsMessage); 396 397 } catch(javax.jms.JMSException ex) { 398 if(log.isErrorEnabled()) log.error("JMSException : " + ex.toString()); 399 } catch(IOException ioex) { 400 if(log.isErrorEnabled()) log.error("IOException : " + ioex.toString()); 401 } 402 } 403 404 415 public void start() throws Exception 416 { 417 if (initCtxFactory != null && providerUrl != null) { 418 Hashtable env = new Hashtable (); 419 env.put(Context.INITIAL_CONTEXT_FACTORY, initCtxFactory); 420 env.put(Context.PROVIDER_URL, providerUrl); 421 422 ctx = new InitialContext (env); 423 } else 424 ctx = new InitialContext (); 425 426 connectionFactory = (javax.jms.TopicConnectionFactory )ctx.lookup(cfName); 427 428 if (connectionFactory == null) 429 throw new IllegalArgumentException ( 430 "Topic connection factory cannot be found in JNDI."); 431 432 topic = (javax.jms.Topic )ctx.lookup(topicName); 433 434 if (topic == null) 435 throw new IllegalArgumentException ("Topic cannot be found in JNDI."); 436 437 connection = connectionFactory.createTopicConnection(); 438 439 boolean addressAssigned = false; 440 441 454 455 456 while(!addressAssigned) { 461 try { 462 connection.setClientID(generateLocalAddress()); 463 addressAssigned = true; 464 } catch (javax.jms.IllegalStateException e) { 465 addressAssigned = true; 467 } catch(javax.jms.InvalidClientIDException ex) { 468 } 470 } 471 472 local_addr = new JMSAddress(connection.getClientID(), false); 473 mcast_addr = new JMSAddress(topicName, true); 474 475 session = connection.createTopicSession(false, 476 javax.jms.Session.AUTO_ACKNOWLEDGE); 477 478 publisher = session.createPublisher(topic); 479 publisher.setTimeToLive(timeToLive); 480 481 subscriber = session.createSubscriber(topic); 482 subscriber.setMessageListener(this); 483 484 connection.start(); 485 486 passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 487 } 488 489 493 public void stop() { 494 495 if(log.isInfoEnabled()) log.info("finishing JMS transport layer."); 496 497 try { 498 connection.stop(); 499 subscriber.setMessageListener(null); 500 session.close(); 501 connection.close(); 502 } 503 catch(Throwable ex) { 504 if(log.isErrorEnabled()) log.error("exception is " + ex); 505 } 506 } 507 508 514 protected String generateLocalAddress() throws java.net.UnknownHostException { 515 String hostName = java.net.InetAddress.getLocalHost().getHostName(); 516 517 int rndPort = RND.nextInt(65535); 518 519 return hostName + ':' + rndPort; 520 } 521 522 525 protected static class JMSAddress implements Address { 526 private String address; 527 private boolean isMCast; 528 529 546 public JMSAddress(String address, boolean isMCast) { 547 this.address = address; 548 this.isMCast = isMCast; 549 } 550 551 554 public JMSAddress() { 555 } 556 557 566 public JMSAddress(String str) { 567 if (str.startsWith("#")) { 568 address = str.substring(1); 569 isMCast = false; 570 } else { 571 address = str; 572 isMCast = true; 573 } 574 } 575 576 582 public String getAddress() { return address; } 583 584 589 public void setAddress(String address) { this.address = address; } 590 591 596 public boolean isMulticastAddress() { 597 return isMCast; 598 } 599 600 public int size() { 601 return 22; 602 } 603 604 607 protected Object clone() throws CloneNotSupportedException { 608 return new JMSAddress(address, isMCast); 609 } 610 611 619 public int compareTo(Object o) throws ClassCastException { 620 if (!(o instanceof JMSAddress)) 621 throw new ClassCastException ("Cannot compare different classes."); 622 623 JMSAddress that = (JMSAddress)o; 624 625 if (that.isMCast != this.isMCast) 626 throw new ClassCastException ( 627 "Addresses are different: one is multicast, and one is not"); 628 629 return this.address.compareTo(that.address); 630 } 631 632 639 public boolean equals(Object obj) { 640 if (obj == this) return true; 641 642 if (!(obj instanceof JMSAddress)) 643 return false; 644 645 JMSAddress that = (JMSAddress)obj; 646 647 if (this.isMCast) 648 return this.isMCast == that.isMCast; 649 else 650 if (this.address == null || that.address == null) 651 return false; 652 else 653 return this.address.equals(that.address) && 654 this.isMCast == that.isMCast; 655 } 656 657 662 public int hashCode() { 663 return toString().hashCode(); 664 } 665 666 669 public void readExternal(ObjectInput in) 670 throws IOException, ClassNotFoundException 671 { 672 address = (String )in.readObject(); 673 isMCast = in.readBoolean(); 674 } 675 676 684 public String toString() { 685 return !isMCast ? '#' + address : address; 686 } 687 688 691 public void writeExternal(ObjectOutput out) throws IOException { 692 out.writeObject(address); 693 out.writeBoolean(isMCast); 694 } 695 696 697 public void writeTo(DataOutputStream outstream) throws IOException { 698 outstream.writeUTF(address); 699 outstream.writeBoolean(isMCast); 700 } 701 702 public void readFrom(DataInputStream instream) throws IOException, IllegalAccessException , InstantiationException { 703 address=instream.readUTF(); 704 isMCast=instream.readBoolean(); 705 } 706 } 707 708 } 709 | Popular Tags |