1 3 package org.jgroups.tests; 4 5 6 import org.jgroups.*; 7 import org.jgroups.blocks.MessageDispatcher; 8 import org.jgroups.blocks.RequestHandler; 9 import org.jgroups.blocks.RspCollector; 10 import org.jgroups.debug.Debugger; 11 import org.jgroups.util.RspList; 12 import org.jgroups.util.Util; 13 14 import java.io.IOException ; 15 16 17 24 public class MessageDispatcherTestAsync implements RequestHandler { 25 Channel channel; 26 MessageDispatcher disp; 27 RspList rsp_list; 28 MyCollector coll=new MyCollector(); 29 Debugger debugger=null; 30 boolean debug=false; 31 boolean cummulative=false; 32 boolean done_submitted=true; 33 static final int NUM=10; 34 35 36 String props="UDP(loopback=true;mcast_addr=224.0.0.35;mcast_port=45566;ip_ttl=32;" + 37 "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + 38 "PING(timeout=2000;num_initial_members=3):" + 39 "MERGE2(min_interval=10000;max_interval=20000):" + 40 "FD_SOCK:" + 41 "VERIFY_SUSPECT(timeout=1500):" + 42 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):" + 43 "UNICAST(timeout=5000):" + 44 "pbcast.STABLE(desired_avg_gossip=20000):" + 45 "FRAG(frag_size=8096;down_thread=false;up_thread=false):" + 46 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + 47 "shun=false;print_local_addr=true)"; 48 49 50 class MyCollector implements RspCollector { 51 52 public void receiveResponse(Message msg) { 53 Object tmp=msg.getObject(); 54 System.out.println("** received response " + tmp + " [sender=" + msg.getSrc() + ']'); 55 } 56 57 public void suspect(Address mbr) { 58 System.out.println("** suspected member " + mbr); 59 } 60 61 public void viewChange(View new_view) { 62 System.out.println("** received new view " + new_view); 63 } 64 } 65 66 67 public MessageDispatcherTestAsync(boolean debug, boolean cummulative) { 68 this.debug=debug; 69 this.cummulative=cummulative; 70 } 71 72 73 public void start() throws Exception { 74 channel=new JChannel(props); 75 if(debug) { 76 debugger=new Debugger((JChannel)channel, cummulative); 77 debugger.start(); 78 } 79 disp=new MessageDispatcher(channel, null, null, this); 81 channel.connect("MessageDispatcherTestAsyncGroup"); 82 } 83 84 85 public void mcast(int num) throws IOException { 86 if(!done_submitted) { 87 System.err.println("Must submit 'done' (press 'd') before mcasting new message"); 88 return; 89 } 90 for(int i=0; i < num; i++) { 91 Util.sleep(100); 92 System.out.println("Casting message #" + i); 93 disp.castMessage(null, 94 i, 95 new Message(null, null, "Number #" + i), 96 coll); 97 } 98 done_submitted=false; 99 } 100 101 102 public void disconnect() { 103 System.out.println("** Disconnecting channel"); 104 channel.disconnect(); 105 System.out.println("** Disconnecting channel -- done"); 106 107 System.out.println("** Closing channel"); 108 channel.close(); 109 System.out.println("** Closing channel -- done"); 110 111 System.out.println("** disp.stop()"); 112 disp.stop(); 113 System.out.println("** disp.stop() -- done"); 114 } 115 116 117 public void done() { 118 for(int i=0; i < NUM; i++) 119 disp.done(i); 120 done_submitted=true; 121 } 122 123 124 public Object handle(Message msg) { 125 Object tmp=msg.getObject(); 126 System.out.println("** handle(" + tmp + ')'); 127 return tmp + ": success"; 128 } 129 130 131 public static void main(String [] args) { 132 int c; 133 MessageDispatcherTestAsync test=null; 134 boolean debug=false, cummulative=false; 135 136 for(int i=0; i < args.length; i++) { 137 if("-help".equals(args[i])) { 138 help(); 139 return; 140 } 141 if("-debug".equals(args[i])) { 142 debug=true; 143 continue; 144 } 145 if("-cummulative".equals(args[i])) { 146 cummulative=true; 147 continue; 148 } 149 } 150 151 152 153 try { 154 test=new MessageDispatcherTestAsync(debug, cummulative); 155 test.start(); 156 while(true) { 157 System.out.println("[m=mcast " + NUM + " msgs x=exit]"); 158 c=System.in.read(); 159 switch(c) { 160 case 'x': 161 test.disconnect(); 162 System.exit(0); 163 return; 164 case 'm': 165 test.mcast(NUM); 166 break; 167 case 'd': 168 test.done(); 169 break; 170 default: 171 break; 172 } 173 174 } 175 } 176 catch(Exception e) { 177 System.err.println(e); 178 } 179 } 180 181 static void help() { 182 System.out.println("MessageDispatcherTestAsync [-debug] [-cummulative]"); 183 } 184 185 } 186 | Popular Tags |