KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > tests > MessageDispatcherTestAsync


1 // $Id: MessageDispatcherTestAsync.java,v 1.8 2004/07/05 14:15:11 belaban Exp $
2

3 package org.jgroups.tests;
4
5
6 import org.jgroups.*;
7 import org.jgroups.blocks.MessageDispatcher;
8 import org.jgroups.blocks.RequestHandler;
9 import org.jgroups.blocks.RspCollector;
10 import org.jgroups.debug.Debugger;
11 import org.jgroups.util.RspList;
12 import org.jgroups.util.Util;
13
14 import java.io.IOException JavaDoc;
15
16
17 /**
18  * Asynchronous example for MessageDispatcher; message is mcast to all members, responses are received
19  * asynchronously by calling RspCollector.receiveResponse(). Message is periodically broadcast to all
20  * members; handle() method is invoked whenever a message is received.
21  *
22  * @author Bela Ban
23  */

24 public class MessageDispatcherTestAsync implements RequestHandler {
25     Channel channel;
26     MessageDispatcher disp;
27     RspList rsp_list;
28     MyCollector coll=new MyCollector();
29     Debugger debugger=null;
30     boolean debug=false;
31     boolean cummulative=false;
32     boolean done_submitted=true;
33     static final int NUM=10;
34
35
36     String JavaDoc props="UDP(loopback=true;mcast_addr=224.0.0.35;mcast_port=45566;ip_ttl=32;" +
37             "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
38             "PING(timeout=2000;num_initial_members=3):" +
39             "MERGE2(min_interval=10000;max_interval=20000):" +
40             "FD_SOCK:" +
41             "VERIFY_SUSPECT(timeout=1500):" +
42             "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):" +
43             "UNICAST(timeout=5000):" +
44             "pbcast.STABLE(desired_avg_gossip=20000):" +
45             "FRAG(frag_size=8096;down_thread=false;up_thread=false):" +
46             "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
47             "shun=false;print_local_addr=true)";
48
49
50     class MyCollector implements RspCollector {
51
52         public void receiveResponse(Message msg) {
53             Object JavaDoc tmp=msg.getObject();
54             System.out.println("** received response " + tmp + " [sender=" + msg.getSrc() + ']');
55         }
56
57         public void suspect(Address mbr) {
58             System.out.println("** suspected member " + mbr);
59         }
60
61         public void viewChange(View new_view) {
62             System.out.println("** received new view " + new_view);
63         }
64     }
65
66
67     public MessageDispatcherTestAsync(boolean debug, boolean cummulative) {
68         this.debug=debug;
69         this.cummulative=cummulative;
70     }
71
72
73     public void start() throws Exception JavaDoc {
74         channel=new JChannel(props);
75         if(debug) {
76             debugger=new Debugger((JChannel)channel, cummulative);
77             debugger.start();
78         }
79         //channel.setOpt(Channel.LOCAL, Boolean.FALSE);
80
disp=new MessageDispatcher(channel, null, null, this);
81         channel.connect("MessageDispatcherTestAsyncGroup");
82     }
83
84
85     public void mcast(int num) throws IOException JavaDoc {
86         if(!done_submitted) {
87             System.err.println("Must submit 'done' (press 'd') before mcasting new message");
88             return;
89         }
90         for(int i=0; i < num; i++) {
91             Util.sleep(100);
92             System.out.println("Casting message #" + i);
93             disp.castMessage(null,
94                     i,
95                     new Message(null, null, "Number #" + i),
96                     coll);
97         }
98         done_submitted=false;
99     }
100
101
102     public void disconnect() {
103         System.out.println("** Disconnecting channel");
104         channel.disconnect();
105         System.out.println("** Disconnecting channel -- done");
106
107         System.out.println("** Closing channel");
108         channel.close();
109         System.out.println("** Closing channel -- done");
110
111         System.out.println("** disp.stop()");
112         disp.stop();
113         System.out.println("** disp.stop() -- done");
114     }
115
116
117     public void done() {
118         for(int i=0; i < NUM; i++)
119             disp.done(i);
120         done_submitted=true;
121     }
122
123
124     public Object JavaDoc handle(Message msg) {
125         Object JavaDoc tmp=msg.getObject();
126         System.out.println("** handle(" + tmp + ')');
127         return tmp + ": success";
128     }
129
130
131     public static void main(String JavaDoc[] args) {
132         int c;
133         MessageDispatcherTestAsync test=null;
134         boolean debug=false, cummulative=false;
135
136         for(int i=0; i < args.length; i++) {
137             if("-help".equals(args[i])) {
138                 help();
139                 return;
140             }
141             if("-debug".equals(args[i])) {
142                 debug=true;
143                 continue;
144             }
145             if("-cummulative".equals(args[i])) {
146                 cummulative=true;
147                 continue;
148             }
149         }
150
151
152
153         try {
154             test=new MessageDispatcherTestAsync(debug, cummulative);
155             test.start();
156             while(true) {
157                 System.out.println("[m=mcast " + NUM + " msgs x=exit]");
158                 c=System.in.read();
159                 switch(c) {
160                     case 'x':
161                         test.disconnect();
162                         System.exit(0);
163                         return;
164                     case 'm':
165                         test.mcast(NUM);
166                         break;
167                     case 'd':
168                         test.done();
169                         break;
170                     default:
171                         break;
172                 }
173
174             }
175         }
176         catch(Exception JavaDoc e) {
177             System.err.println(e);
178         }
179     }
180
181     static void help() {
182         System.out.println("MessageDispatcherTestAsync [-debug] [-cummulative]");
183     }
184
185 }
186
Popular Tags