KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > GossipClient


1 // $Id: GossipClient.java,v 1.8 2004/12/31 14:34:35 belaban Exp $
2

3 package org.jgroups.stack;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.Address;
9
10 import java.io.ObjectInputStream JavaDoc;
11 import java.io.ObjectOutputStream JavaDoc;
12 import java.net.InetAddress JavaDoc;
13 import java.net.Socket JavaDoc;
14 import java.util.*;
15
16
17 /**
18  * Local stub for clients to access one (or more) GossipServers. Will use proprietary protocol
19  * (using GossipData PDUs) based on TCP to connect to GossipServer.<p>
20  * Requires JDK >= 1.3 due to the use of Timer<p>
21  * todo: make access to multiple GossipServer concurrent (1 thread/GossipServer)
22  * @author Bela Ban Oct 4 2001
23  */

24 public class GossipClient {
25     Timer timer=new Timer();
26     final Hashtable groups=new Hashtable(); // groups - Vector of Addresses
27
Refresher refresher_task=new Refresher();
28     final Vector gossip_servers=new Vector(); // a list of GossipServers (IpAddress)
29
boolean timer_running=false;
30     long EXPIRY_TIME=20000; // must be less than in GossipServer
31

32     protected final Log log=LogFactory.getLog(this.getClass());
33
34
35     /**
36      * Creates the GossipClient
37      * @param gossip_host The address and port of the host on which the GossipServer is running
38      * @param expiry Interval (in msecs) for the refresher task
39      */

40     public GossipClient(IpAddress gossip_host, long expiry) {
41         init(gossip_host, expiry);
42     }
43
44
45     /**
46      Creates the GossipClient
47      @param gossip_hosts List of IpAddresses
48      @param expiry Interval (in msecs) for the refresher task
49      */

50     public GossipClient(Vector gossip_hosts, long expiry) {
51         if(gossip_hosts == null) {
52             if(log.isErrorEnabled()) log.error("empty set of GossipServers given");
53             return;
54         }
55         for(int i=0; i < gossip_hosts.size(); i++)
56             init((IpAddress) gossip_hosts.elementAt(i), expiry);
57     }
58
59
60     public void stop() {
61         timer_running=false;
62         timer.cancel();
63         groups.clear();
64         // provide another refresh tools in case the channel gets reconnected
65
timer=new Timer();
66         refresher_task=new Refresher();
67
68     }
69
70
71     /**
72      * Adds a GossipServer to be accessed.
73      */

74     public void addGossipServer(IpAddress gossip_host) {
75         if(!gossip_servers.contains(gossip_host))
76             gossip_servers.addElement(gossip_host);
77     }
78
79
80     /**
81      Adds the member to the given group. If the group already has an entry for the member,
82      its timestamp will be updated, preventing the cache cleaner from removing the entry.<p>
83      The entry will be registered <em>with all GossipServers that GossipClient is configured to access</em>
84      */

85     public void register(String JavaDoc group, Address mbr) {
86         Vector mbrs;
87
88         if(group == null || mbr == null) {
89             if(log.isErrorEnabled()) log.error("group or mbr is null");
90             return;
91         }
92         mbrs=(Vector) groups.get(group);
93         if(mbrs == null) {
94             mbrs=new Vector();
95             mbrs.addElement(mbr);
96             groups.put(group, mbrs);
97         }
98         else {
99             if(!mbrs.contains(mbr))
100                 mbrs.addElement(mbr);
101         }
102
103         _register(group, mbr); // update entry in GossipServer
104

105         if(!timer_running) {
106             timer.schedule(refresher_task, EXPIRY_TIME, EXPIRY_TIME);
107             timer_running=true;
108         }
109     }
110
111
112     /**
113      Returns all members of a given group
114      @param group The group name
115      @return Vector A list of Addresses
116      */

117     public Vector getMembers(String JavaDoc group) {
118         if(group == null) {
119             if(log.isErrorEnabled()) log.error("group is null");
120             return null;
121         }
122
123         return _getMembers(group);
124     }
125
126
127
128     /* ------------------------------------- Private methods ----------------------------------- */
129
130
131     void init(IpAddress gossip_host, long expiry) {
132         EXPIRY_TIME=expiry;
133         addGossipServer(gossip_host);
134     }
135
136
137     /**
138      * Registers the group|mbr with *all* GossipServers
139      * todo Parallelize GossipServer access
140      */

141     void _register(String JavaDoc group, Address mbr) {
142         Socket JavaDoc sock;
143         ObjectOutputStream JavaDoc out;
144         IpAddress entry;
145         GossipData gossip_req;
146
147         for(int i=0; i < gossip_servers.size(); i++) {
148             entry=(IpAddress) gossip_servers.elementAt(i);
149             if(entry.getIpAddress() == null || entry.getPort() == 0) {
150                 if(log.isErrorEnabled()) log.error("entry.host or entry.port is null");
151                 continue;
152             }
153             try {
154                 if(log.isTraceEnabled())
155                     log.trace("REGISTER_REQ --> " + entry.getIpAddress() + ':' + entry.getPort());
156                 sock=new Socket JavaDoc(entry.getIpAddress(), entry.getPort());
157                 out=new ObjectOutputStream JavaDoc(sock.getOutputStream());
158                 gossip_req=new GossipData(GossipData.REGISTER_REQ, group, mbr, null);
159                 // must send GossipData as fast as possible, otherwise the
160
// request might be rejected
161
out.writeObject(gossip_req);
162                 out.flush();
163                 sock.close();
164             }
165             catch(Exception JavaDoc ex) {
166                 if(log.isErrorEnabled()) log.error("exception connecting to host " + entry + ": " + ex);
167             }
168         }
169     }
170
171     /**
172      * Sends a GET_MBR_REQ to *all* GossipServers, merges responses.
173      */

174     Vector _getMembers(String JavaDoc group) {
175         Vector ret=new Vector();
176         Socket JavaDoc sock;
177         ObjectOutputStream JavaDoc out;
178         ObjectInputStream JavaDoc in;
179         IpAddress entry;
180         GossipData gossip_req, gossip_rsp;
181         Address mbr;
182
183         for(int i=0; i < gossip_servers.size(); i++) {
184             entry=(IpAddress) gossip_servers.elementAt(i);
185             if(entry.getIpAddress() == null || entry.getPort() == 0) {
186                 if(log.isErrorEnabled()) log.error("entry.host or entry.port is null");
187                 continue;
188             }
189             try {
190
191                 if(log.isTraceEnabled()) log.trace("GET_REQ --> " + entry.getIpAddress() + ':' + entry.getPort());
192                 sock=new Socket JavaDoc(entry.getIpAddress(), entry.getPort());
193                 out=new ObjectOutputStream JavaDoc(sock.getOutputStream());
194
195                 gossip_req=new GossipData(GossipData.GET_REQ, group, null, null);
196                 // must send GossipData as fast as possible, otherwise the
197
// request might be rejected
198
out.writeObject(gossip_req);
199                 out.flush();
200
201                 in=new ObjectInputStream JavaDoc(sock.getInputStream());
202                 gossip_rsp=(GossipData) in.readObject();
203                 if(gossip_rsp.mbrs != null) { // merge with ret
204
for(int j=0; j < gossip_rsp.mbrs.size(); j++) {
205                         mbr=(Address) gossip_rsp.mbrs.elementAt(j);
206                         if(!ret.contains(mbr))
207                             ret.addElement(mbr);
208                     }
209                 }
210
211
212                 sock.close();
213             }
214             catch(Exception JavaDoc ex) {
215                 if(log.isErrorEnabled()) log.error("exception connecting to host " + entry + ": " + ex);
216             }
217         }
218
219         return ret;
220     }
221
222     /* ---------------------------------- End of Private methods ------------------------------- */
223
224
225
226     /**
227      * Periodically iterates through groups and refreshes all registrations with GossipServer
228      */

229     private class Refresher extends TimerTask {
230
231         public void run() {
232             int num_items=0;
233             String JavaDoc group;
234             Vector mbrs;
235             Address mbr;
236
237              if(log.isTraceEnabled()) log.trace("refresher task is run");
238             for(Enumeration e=groups.keys(); e.hasMoreElements();) {
239                 group=(String JavaDoc) e.nextElement();
240                 mbrs=(Vector) groups.get(group);
241                 if(mbrs != null) {
242                     for(int i=0; i < mbrs.size(); i++) {
243                         mbr=(Address) mbrs.elementAt(i);
244                         if(log.isTraceEnabled()) log.trace("registering " + group + " : " + mbr);
245                         register(group, mbr);
246                         num_items++;
247                     }
248                 }
249             }
250             if(log.isTraceEnabled()) log.trace("refresher task done. Registered " + num_items + " items");
251         }
252
253     }
254
255
256     public static void main(String JavaDoc[] args) {
257         Vector gossip_hosts=new Vector();
258         String JavaDoc host;
259         InetAddress JavaDoc ip_addr;
260         int port;
261         boolean get=false, register=false, keep_running=false;
262         String JavaDoc register_host=null;
263         int register_port=0;
264         String JavaDoc get_group=null, register_group=null;
265         GossipClient gossip_client=null;
266         Vector mbrs;
267         long expiry=20000;
268
269
270         for(int i=0; i < args.length; i++) {
271             if("-help".equals(args[i])) {
272                 usage();
273                 return;
274             }
275             if("-expiry".equals(args[i])) {
276                 expiry=Long.parseLong(args[++i]);
277                 continue;
278             }
279             if("-host".equals(args[i])) {
280                 host=args[++i];
281                 port=Integer.parseInt(args[++i]);
282                 try {
283                     ip_addr=InetAddress.getByName(host);
284                     gossip_hosts.addElement(new IpAddress(ip_addr, port));
285                 }
286                 catch(Exception JavaDoc ex) {
287                     System.err.println(ex);
288                 }
289                 continue;
290             }
291             if("-keep_running".equals(args[i])) {
292                 keep_running=true;
293                 continue;
294             }
295             if("-get".equals(args[i])) {
296                 get=true;
297                 get_group=args[++i];
298                 continue;
299             }
300             if("-register".equals(args[i])) {
301                 register_group=args[++i];
302                 register_host=args[++i];
303                 register_port=Integer.parseInt(args[++i]);
304                 register=true;
305                 continue;
306             }
307             usage();
308             return;
309         }
310
311         if(gossip_hosts.size() == 0) {
312             System.err.println("At least 1 GossipServer has to be given");
313             return;
314         }
315
316         if(!register && !get) {
317             System.err.println("Neither get nor register command given, will not do anything");
318             return;
319         }
320
321         try {
322
323         }
324         catch(Throwable JavaDoc ex) {
325             System.err.println("GossipClient.main(): error initailizing JGroups Trace: " + ex);
326         }
327
328         try {
329             gossip_client=new GossipClient(gossip_hosts, expiry);
330             if(register) {
331                 System.out.println("Registering " + register_group + " --> " + register_host + ':' + register_port);
332                 gossip_client.register(register_group, new IpAddress(register_host, register_port));
333             }
334
335             if(get) {
336                 System.out.println("Getting members for group " + get_group);
337                 mbrs=gossip_client.getMembers(get_group);
338                 System.out.println("Members for group " + get_group + " are " + mbrs);
339             }
340         }
341         catch(Exception JavaDoc ex) {
342             System.err.println(ex);
343         }
344         if(!keep_running)
345             gossip_client.stop();
346     }
347
348
349     static void usage() {
350         System.out.println("GossipClient [-help] [-host <hostname> <port>]+ " +
351                            " [-get <groupname>] [-register <groupname hostname port>] [-expiry <msecs>] " +
352                            "[-keep_running]]");
353     }
354
355 }
356
Popular Tags