KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > PERF_TP


1 // $Id: PERF_TP.java,v 1.10 2005/04/15 16:17:49 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.Address;
7 import org.jgroups.Event;
8 import org.jgroups.Message;
9 import org.jgroups.stack.Protocol;
10
11
12 /**
13  * Measures the time for a message to travel from the channel to the transport
14  * @author Bela Ban
15  * @version $Id: PERF_TP.java,v 1.10 2005/04/15 16:17:49 belaban Exp $
16  */

17 public class PERF_TP extends Protocol {
18     private Address local_addr=null;
19     static PERF_TP instance=null;
20     long stop, start;
21     long num_msgs=0;
22     long expected_msgs=0;
23     boolean done=false;
24
25
26     public static PERF_TP getInstance() {
27         return instance;
28     }
29
30     public PERF_TP() {
31         if(instance == null)
32             instance=this;
33     }
34
35
36     public String JavaDoc toString() {
37         return "Protocol PERF_TP (local address: " + local_addr + ')';
38     }
39
40     public boolean done() {
41         return done;
42     }
43
44     public long getNumMessages() {
45         return num_msgs;
46     }
47
48     public void setExpectedMessages(long m) {
49         expected_msgs=m;
50         num_msgs=0;
51         done=false;
52         start=System.currentTimeMillis();
53     }
54
55     public void reset() {
56         num_msgs=expected_msgs=stop=start=0;
57         done=false;
58     }
59
60     public long getTotalTime() {
61         return stop-start;
62     }
63
64
65     /*------------------------------ Protocol interface ------------------------------ */
66
67     public String JavaDoc getName() {
68         return "PERF_TP";
69     }
70
71
72
73
74     public void init() throws Exception JavaDoc {
75         local_addr=new org.jgroups.stack.IpAddress("localhost", 10000); // fake address
76
}
77
78     public void start() throws Exception JavaDoc {
79         passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
80     }
81
82
83     /**
84      * Caller by the layer above this layer. Usually we just put this Message
85      * into the send queue and let one or more worker threads handle it. A worker thread
86      * then removes the Message from the send queue, performs a conversion and adds the
87      * modified Message to the send queue of the layer below it, by calling Down).
88      */

89     public void down(Event evt) {
90         Message msg;
91         Address dest_addr;
92
93
94         switch(evt.getType()) {
95
96         case Event.MSG:
97             if(done) {
98                 break;
99             }
100             msg=(Message)evt.getArg();
101             dest_addr=msg.getDest();
102             if(dest_addr == null)
103                 num_msgs++;
104             if(num_msgs >= expected_msgs) {
105                 stop=System.currentTimeMillis();
106                 synchronized(this) {
107                     done=true;
108                     this.notifyAll();
109                 }
110                 if(log.isInfoEnabled()) log.info("all done (num_msgs=" + num_msgs + ", expected_msgs=" + expected_msgs);
111             }
112             break;
113
114         case Event.CONNECT:
115             passUp(new Event(Event.CONNECT_OK));
116             return;
117
118         case Event.DISCONNECT:
119             passUp(new Event(Event.DISCONNECT_OK));
120             return;
121         }
122
123         if(down_prot != null)
124             passDown(evt);
125     }
126
127
128     public void up(Event evt) {
129         Message msg;
130         Address dest_addr;
131         switch(evt.getType()) {
132
133         case Event.MSG:
134             if(done) {
135                 if(log.isWarnEnabled()) log.warn("all done (discarding msg)");
136                 break;
137             }
138             msg=(Message)evt.getArg();
139             dest_addr=msg.getDest();
140             if(dest_addr == null)
141                 num_msgs++;
142             if(num_msgs >= expected_msgs) {
143                 stop=System.currentTimeMillis();
144                 synchronized(this) {
145                     done=true;
146                     this.notifyAll();
147                 }
148                 if(log.isInfoEnabled()) log.info("all done (num_msgs=" + num_msgs + ", expected_msgs=" + expected_msgs);
149             }
150             return;
151         }
152         passUp(evt);
153     }
154
155     /*--------------------------- End of Protocol interface -------------------------- */
156
157
158
159
160 }
161
Popular Tags