1 3 4 package org.jgroups.tests; 5 6 7 import org.jgroups.*; 8 import org.jgroups.blocks.GroupRequest; 9 import org.jgroups.blocks.RpcDispatcher; 10 import org.jgroups.util.RspList; 11 import org.jgroups.util.Util; 12 13 14 15 21 public class RpcDispatcherStressTest implements MembershipListener { 22 Channel channel; 23 RpcDispatcher disp; 24 RspList rsp_list; 25 Publisher[] threads=null; 26 int[] results; 27 28 public int print(int number) throws Exception { 29 return number * 2; 30 } 31 32 33 public void start(String props, int num_threads, long interval, boolean discard_local) throws Exception { 34 channel=new JChannel(props); 35 if(discard_local) 36 channel.setOpt(Channel.LOCAL, Boolean.FALSE); 37 disp=new RpcDispatcher(channel, null, this, this); 38 channel.connect("RpcDispatcherStressTestGroup"); 39 40 threads=new Publisher[num_threads]; 41 results=new int[num_threads]; 42 for(int i=0; i < threads.length; i++) { 43 threads[i]=new Publisher(i, interval); 44 results[i]=0; 45 } 46 47 System.out.println("-- Created " + threads.length + " threads. Press enter to start them " + 48 "('-' for sent message, '+' for received message)"); 49 System.out.println("-- Press enter to stop the threads"); 50 51 System.out.flush(); 52 System.in.read(); 53 System.in.skip(System.in.available()); 54 55 for(int i=0; i < threads.length; i++) 56 threads[i].start(); 57 58 System.out.flush(); 59 System.in.read(); 60 System.in.skip(System.in.available()); 61 62 for(int i=0; i < threads.length; i++) { 63 threads[i].stopThread(); 64 threads[i].join(2000); 65 } 66 67 System.out.println("\n"); 68 for(int i=0; i < threads.length; i++) { 69 System.out.println("-- thread #" + i + ": called remote method " + results[i] + " times"); 70 } 71 72 73 System.out.println("Closing channel"); 74 channel.close(); 75 System.out.println("Closing channel: -- done"); 76 77 System.out.println("Stopping dispatcher"); 78 disp.stop(); 79 System.out.println("Stopping dispatcher: -- done"); 80 } 81 82 83 84 85 public void viewAccepted(View new_view) { 86 System.out.println("-- new view: " + new_view); 87 } 88 89 public void suspect(Address suspected_mbr) { 90 System.out.println("-- suspected " + suspected_mbr); 91 } 92 93 94 95 public void block() { 96 ; 97 } 98 99 100 101 102 103 class Publisher extends Thread { 104 int rank=0; 105 boolean running=true; 106 int num_calls=0; 107 long interval=1000; 108 109 Publisher(int rank, long interval) { 110 super(); 111 setDaemon(true); 112 this.rank=rank; 113 this.interval=interval; 114 } 115 116 public void stopThread() { 117 running=false; 118 } 119 120 public void run() { 121 while(running) { 122 System.out.print(rank + "- "); 123 disp.callRemoteMethods(null, "print", new Object []{new Integer (num_calls)}, 124 new Class []{int.class}, GroupRequest.GET_ALL, 0); 125 num_calls++; 126 System.out.print(rank + "+ "); 127 Util.sleep(interval); 128 } 129 results[rank]=num_calls; 130 } 131 } 132 133 134 135 136 public static void main(String [] args) { 137 String props; 138 int num_threads=1; 139 long interval=1000; 140 boolean discard_local=false; 141 142 props="UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" + 143 "ucast_recv_buf_size=16000;ucast_send_buf_size=16000;" + 144 "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):"+ 145 "PING(timeout=2000;num_initial_members=3):"+ 146 "MERGE2(min_interval=5000;max_interval=10000):"+ 147 "FD_SOCK:"+ 148 "VERIFY_SUSPECT(timeout=1500):"+ 149 "pbcast.NAKACK(gc_lag=50;retransmit_timeout=1000,1500,2000,3000;max_xmit_size=8192):"+ 150 "UNICAST(timeout=1000,1500,2000,3000):"+ 151 "pbcast.STABLE(desired_avg_gossip=10000):"+ 152 "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+ 153 "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true):"+ 154 "pbcast.STATE_TRANSFER"; 155 156 try { 157 for(int i=0; i < args.length; i++) { 158 if("-num_threads".equals(args[i])) { 159 num_threads=Integer.parseInt(args[++i]); 160 continue; 161 } 162 if("-interval".equals(args[i])) { 163 interval=Long.parseLong(args[++i]); 164 continue; 165 } 166 if("-props".equals(args[i])) { 167 props=args[++i]; 168 continue; 169 } 170 if("-discard_local".equals(args[i])) { 171 discard_local=true; 172 continue; 173 } 174 help(); 175 return; 176 } 177 178 179 new RpcDispatcherStressTest().start(props, num_threads, interval, discard_local); 180 } 181 catch(Exception e) { 182 System.err.println(e); 183 } 184 } 185 186 187 static void help() { 188 System.out.println("RpcDispatcherStressTest [-help] [-interval <msecs>] " + 189 "[-num_threads <number>] [-props <stack properties>] [-discard_local]"); 190 } 191 } 192 | Popular Tags |