KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > adaptjms > JmsTester


1 package org.jgroups.tests.adaptjms;
2
3 import org.jgroups.stack.IpAddress;
4 import org.jgroups.util.Util;
5
6 import javax.jms.*;
7 import java.util.ArrayList JavaDoc;
8 import java.util.List JavaDoc;
9
10
11
12 /** Javagroups version used was 2.0.3. Recompiled and tested again with 2.0.6.
13  * JGroupsTester:
14  * 1. Instantiates a JChannel object and joins the group.
15  * Partition properties conf. is the same as in the JBoss
16  * default configuration except for min_wait_time parameter
17  * that causes the following error:
18  * UNICAST.setProperties():
19  * these properties are not recognized:
20  * -- listing properties --
21  * min_wait_time=2000
22  * 2. Starts receiving until it receives a view change message
23  * with the expected number of members.
24  * 3. Starts the receiver thread and if(sender), the sender thread.
25  * @author Milcan Prica (prica@deei.units.it)
26  * @author Bela Ban (belaban@yahoo.com)
27  * @version $Id: JmsTester.java,v 1.3 2004/07/05 14:15:07 belaban Exp $
28  */

29 public class JmsTester {
30     private boolean sender;
31     private int num_msgs;
32     private int msg_size;
33     private int num_senders;
34     private long log_interval=1000;
35     Connection conn;
36     TopicSession session;
37     TopicPublisher pub;
38     Topic topic;
39     int num_members;
40     Object JavaDoc local_addr;
41     MyReceiver receiver=null;
42
43     /** List<Address> . Contains member addresses */
44     List JavaDoc members=new ArrayList JavaDoc();
45
46
47
48     public JmsTester(Connection conn, TopicSession session, Topic topic, TopicPublisher pub, boolean snd, int num_msgs,
49                      int msg_size, int num_members, int ns, long log_interval) {
50         sender=snd;
51         this.num_msgs=num_msgs;
52         this.msg_size=msg_size;
53         num_senders=ns;
54         this.num_members=num_members;
55         this.log_interval=log_interval;
56         this.conn=conn;
57         this.session=session;
58         this.topic=topic;
59         this.pub=pub;
60     }
61
62     public void initialize() throws Exception JavaDoc {
63         this.local_addr=conn.getClientID();
64         waitUntilAllMembersHaveJoined();
65         Util.sleep(1000);
66
67         conn.start();
68         new ReceiverThread(session, topic, num_msgs, msg_size, num_senders, log_interval).start();
69         if(sender) {
70             new SenderThread(session, pub, topic, num_msgs, msg_size, log_interval).start();
71         }
72     }
73
74     void waitUntilAllMembersHaveJoined() throws Exception JavaDoc {
75         discoverExistingMembers();
76
77     }
78
79     private void discoverExistingMembers() throws Exception JavaDoc {
80         receiver=new MyReceiver();
81         members.clear();
82         receiver.start();
83         receiver.discoverExistingMembers();
84         receiver.sendMyAddress();
85         receiver.waitUntilAllMembersHaveJoined();
86     }
87
88
89
90     class MyReceiver implements MessageListener {
91         boolean running=true;
92         TopicSubscriber sub;
93
94
95         public void start() throws JMSException {
96             sub=session.createSubscriber(topic);
97             sub.setMessageListener(this);
98         }
99
100         public void onMessage(Message message) {
101             Request req;
102
103             if(message instanceof ObjectMessage) {
104                 req=(Request)message;
105                 switch(req.type) {
106                     case Request.DISCOVERY_REQ:
107                         Request rsp=new Request(Request.NEW_MEMBER, local_addr);
108                         ObjectMessage msg=null;
109                         try {
110                             msg=session.createObjectMessage(rsp);
111                             pub.publish(msg);
112                         }
113                         catch(JMSException e) {
114                             e.printStackTrace();
115                         }
116                         break;
117                     case Request.NEW_MEMBER:
118                         IpAddress new_mbr=(IpAddress)req.arg;
119                         if(!members.contains(new_mbr)) {
120                             members.add(new_mbr);
121                             System.out.println("-- discovered " + new_mbr);
122                             if(members.size() >= num_members) {
123                                 System.out.println("-- all members have joined (" + members + ')');
124                                 running=false;
125                                 synchronized(this) {
126                                     if(sub != null) {
127                                         try {
128                                             sub.setMessageListener(null);
129                                         }
130                                         catch(JMSException e) {
131                                             e.printStackTrace();
132                                         }
133                                     }
134                                     this.notifyAll();
135                                 }
136                                 break;
137                             }
138                         }
139                         break;
140                     default:
141                         System.err.println("don't recognize request with type=" + req.type);
142                         break;
143                 }
144             }
145         }
146
147
148         public void discoverExistingMembers() throws Exception JavaDoc {
149             Request req=new Request(Request.DISCOVERY_REQ, null);
150             ObjectMessage msg=session.createObjectMessage(req);
151             pub.publish(msg);
152         }
153
154         public void sendMyAddress() throws Exception JavaDoc {
155             Request req=new Request(Request.NEW_MEMBER, local_addr);
156             ObjectMessage msg=session.createObjectMessage(req);
157             pub.publish(msg);
158         }
159
160         public void waitUntilAllMembersHaveJoined() throws InterruptedException JavaDoc {
161             if(members.size() < num_members) {
162                 synchronized(receiver) {
163                     receiver.wait();
164                 }
165             }
166         }
167
168     }
169
170 }
171
172
Popular Tags