1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.Address; 7 import org.jgroups.Event; 8 import org.jgroups.Message; 9 import org.jgroups.stack.Protocol; 10 11 12 17 public class PERF_TP extends Protocol { 18 private Address local_addr=null; 19 static PERF_TP instance=null; 20 long stop, start; 21 long num_msgs=0; 22 long expected_msgs=0; 23 boolean done=false; 24 25 26 public static PERF_TP getInstance() { 27 return instance; 28 } 29 30 public PERF_TP() { 31 if(instance == null) 32 instance=this; 33 } 34 35 36 public String toString() { 37 return "Protocol PERF_TP (local address: " + local_addr + ')'; 38 } 39 40 public boolean done() { 41 return done; 42 } 43 44 public long getNumMessages() { 45 return num_msgs; 46 } 47 48 public void setExpectedMessages(long m) { 49 expected_msgs=m; 50 num_msgs=0; 51 done=false; 52 start=System.currentTimeMillis(); 53 } 54 55 public void reset() { 56 num_msgs=expected_msgs=stop=start=0; 57 done=false; 58 } 59 60 public long getTotalTime() { 61 return stop-start; 62 } 63 64 65 66 67 public String getName() { 68 return "PERF_TP"; 69 } 70 71 72 73 74 public void init() throws Exception { 75 local_addr=new org.jgroups.stack.IpAddress("localhost", 10000); } 77 78 public void start() throws Exception { 79 passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 80 } 81 82 83 89 public void down(Event evt) { 90 Message msg; 91 Address dest_addr; 92 93 94 switch(evt.getType()) { 95 96 case Event.MSG: 97 if(done) { 98 break; 99 } 100 msg=(Message)evt.getArg(); 101 dest_addr=msg.getDest(); 102 if(dest_addr == null) 103 num_msgs++; 104 if(num_msgs >= expected_msgs) { 105 stop=System.currentTimeMillis(); 106 synchronized(this) { 107 done=true; 108 this.notifyAll(); 109 } 110 if(log.isInfoEnabled()) log.info("all done (num_msgs=" + num_msgs + ", expected_msgs=" + expected_msgs); 111 } 112 break; 113 114 case Event.CONNECT: 115 passUp(new Event(Event.CONNECT_OK)); 116 return; 117 118 case Event.DISCONNECT: 119 passUp(new Event(Event.DISCONNECT_OK)); 120 return; 121 } 122 123 if(down_prot != null) 124 passDown(evt); 125 } 126 127 128 public void up(Event evt) { 129 Message msg; 130 Address dest_addr; 131 switch(evt.getType()) { 132 133 case Event.MSG: 134 if(done) { 135 if(log.isWarnEnabled()) log.warn("all done (discarding msg)"); 136 break; 137 } 138 msg=(Message)evt.getArg(); 139 dest_addr=msg.getDest(); 140 if(dest_addr == null) 141 num_msgs++; 142 if(num_msgs >= expected_msgs) { 143 stop=System.currentTimeMillis(); 144 synchronized(this) { 145 done=true; 146 this.notifyAll(); 147 } 148 if(log.isInfoEnabled()) log.info("all done (num_msgs=" + num_msgs + ", expected_msgs=" + expected_msgs); 149 } 150 return; 151 } 152 passUp(evt); 153 } 154 155 156 157 158 159 160 } 161 | Popular Tags |