1 package org.jgroups.tests.adaptjms; 2 3 import org.jgroups.stack.IpAddress; 4 import org.jgroups.util.Util; 5 6 import javax.jms.*; 7 import java.util.ArrayList ; 8 import java.util.List ; 9 10 11 12 29 public class JmsTester { 30 private boolean sender; 31 private int num_msgs; 32 private int msg_size; 33 private int num_senders; 34 private long log_interval=1000; 35 Connection conn; 36 TopicSession session; 37 TopicPublisher pub; 38 Topic topic; 39 int num_members; 40 Object local_addr; 41 MyReceiver receiver=null; 42 43 44 List members=new ArrayList (); 45 46 47 48 public JmsTester(Connection conn, TopicSession session, Topic topic, TopicPublisher pub, boolean snd, int num_msgs, 49 int msg_size, int num_members, int ns, long log_interval) { 50 sender=snd; 51 this.num_msgs=num_msgs; 52 this.msg_size=msg_size; 53 num_senders=ns; 54 this.num_members=num_members; 55 this.log_interval=log_interval; 56 this.conn=conn; 57 this.session=session; 58 this.topic=topic; 59 this.pub=pub; 60 } 61 62 public void initialize() throws Exception { 63 this.local_addr=conn.getClientID(); 64 waitUntilAllMembersHaveJoined(); 65 Util.sleep(1000); 66 67 conn.start(); 68 new ReceiverThread(session, topic, num_msgs, msg_size, num_senders, log_interval).start(); 69 if(sender) { 70 new SenderThread(session, pub, topic, num_msgs, msg_size, log_interval).start(); 71 } 72 } 73 74 void waitUntilAllMembersHaveJoined() throws Exception { 75 discoverExistingMembers(); 76 77 } 78 79 private void discoverExistingMembers() throws Exception { 80 receiver=new MyReceiver(); 81 members.clear(); 82 receiver.start(); 83 receiver.discoverExistingMembers(); 84 receiver.sendMyAddress(); 85 receiver.waitUntilAllMembersHaveJoined(); 86 } 87 88 89 90 class MyReceiver implements MessageListener { 91 boolean running=true; 92 TopicSubscriber sub; 93 94 95 public void start() throws JMSException { 96 sub=session.createSubscriber(topic); 97 sub.setMessageListener(this); 98 } 99 100 public void onMessage(Message message) { 101 Request req; 102 103 if(message instanceof ObjectMessage) { 104 req=(Request)message; 105 switch(req.type) { 106 case Request.DISCOVERY_REQ: 107 Request rsp=new Request(Request.NEW_MEMBER, local_addr); 108 ObjectMessage msg=null; 109 try { 110 msg=session.createObjectMessage(rsp); 111 pub.publish(msg); 112 } 113 catch(JMSException e) { 114 e.printStackTrace(); 115 } 116 break; 117 case Request.NEW_MEMBER: 118 IpAddress new_mbr=(IpAddress)req.arg; 119 if(!members.contains(new_mbr)) { 120 members.add(new_mbr); 121 System.out.println("-- discovered " + new_mbr); 122 if(members.size() >= num_members) { 123 System.out.println("-- all members have joined (" + members + ')'); 124 running=false; 125 synchronized(this) { 126 if(sub != null) { 127 try { 128 sub.setMessageListener(null); 129 } 130 catch(JMSException e) { 131 e.printStackTrace(); 132 } 133 } 134 this.notifyAll(); 135 } 136 break; 137 } 138 } 139 break; 140 default: 141 System.err.println("don't recognize request with type=" + req.type); 142 break; 143 } 144 } 145 } 146 147 148 public void discoverExistingMembers() throws Exception { 149 Request req=new Request(Request.DISCOVERY_REQ, null); 150 ObjectMessage msg=session.createObjectMessage(req); 151 pub.publish(msg); 152 } 153 154 public void sendMyAddress() throws Exception { 155 Request req=new Request(Request.NEW_MEMBER, local_addr); 156 ObjectMessage msg=session.createObjectMessage(req); 157 pub.publish(msg); 158 } 159 160 public void waitUntilAllMembersHaveJoined() throws InterruptedException { 161 if(members.size() < num_members) { 162 synchronized(receiver) { 163 receiver.wait(); 164 } 165 } 166 } 167 168 } 169 170 } 171 172 | Popular Tags |