KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > GossipServer


1 // $Id: GossipServer.java,v 1.7 2004/09/23 16:29:53 belaban Exp $
2

3 package org.jgroups.stack;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.Address;
8
9 import java.io.ObjectInputStream JavaDoc;
10 import java.io.ObjectOutputStream JavaDoc;
11 import java.net.InetAddress JavaDoc;
12 import java.net.ServerSocket JavaDoc;
13 import java.net.Socket JavaDoc;
14 import java.util.*;
15
16
17 /**
18  * Maintains a cache of member addresses for each group. There are essentially 2 functions: get the members for
19  * a given group and register a new member for a given group. Clients have to periodically renew their
20  * registrations (like in JINI leasing), otherwise the cache will be cleaned periodically (oldest entries first).<p>
21  * The server should be running at a well-known port. This can be done by for example adding an entry to
22  * /etc/inetd.conf on UNIX systems, e.g. <code>gossipsrv stream tcp nowait root /bin/start-gossip-server</code>.
23  * <code>gossipsrv</code> has to be defined in /etc/services and <code>start-gossip-server</code> is a script
24  * which starts the GossipServer at the well-known port (define in /etc/services). The protocol between GossipServer
25  * and GossipClient consists of REGISTER_REQ, GET_MEMBERS_REQ and GET_MEMBERS_RSP protocol data units.<p>
26  * The server does not spawn a thread/request, but does all of its processing on the main thread. This should not
27  * be a problem as all requests are short-lived. However, the server would essentially cease processing requests
28  * if a telnet connected to it.<p>
29  * Requires JDK >= 1.3 due to the use of Timer
30  * @author Bela Ban Oct 4 2001
31  */

32 public class GossipServer {
33     final Hashtable groups=new Hashtable(); // groupname - vector of Entry's
34
int port=7500;
35     ServerSocket JavaDoc srv_sock=null;
36     long EXPIRY_TIME=30000; // time (in msecs) until a cache entry expires
37
CacheCleaner cache_cleaner=null; // task that is periodically invoked to sweep old entries from the cache
38
final Timer timer=new Timer(true); // start as daemon thread, so we won't block on it upon termination
39
InetAddress JavaDoc bind_address=null;
40     protected final Log log=LogFactory.getLog(getClass());
41
42
43     public GossipServer(int port) throws Exception JavaDoc {
44         this.port=port;
45         init();
46     }
47
48
49     public GossipServer(int port, long expiry_time) throws Exception JavaDoc {
50         this.port=port;
51         EXPIRY_TIME=expiry_time;
52         init();
53     }
54
55     public GossipServer(int port, long expiry_time, InetAddress JavaDoc bind_address) throws Exception JavaDoc {
56         this.port=port;
57         this.bind_address=bind_address;
58         EXPIRY_TIME=expiry_time;
59         init();
60     }
61
62
63     public void run() {
64         Socket JavaDoc sock;
65         ObjectInputStream JavaDoc input;
66         ObjectOutputStream JavaDoc output=null;
67         GossipData gossip_req, gossip_rsp;
68         boolean looping=true;
69
70         while(looping) {
71             try {
72                 sock=srv_sock.accept();
73                 if(log.isInfoEnabled()) log.info("accepted connection from " + sock.getInetAddress() +
74                                                  ':' + sock.getPort());
75                 sock.setSoLinger(true, 500);
76                 input=new ObjectInputStream JavaDoc(sock.getInputStream());
77                 gossip_req=(GossipData) input.readObject();
78                 gossip_rsp=processGossip(gossip_req);
79                 if(gossip_rsp != null) {
80                     output=new ObjectOutputStream JavaDoc(sock.getOutputStream());
81                     output.writeObject(gossip_rsp);
82                     output.flush();
83                     output.close();
84                 }
85                 input.close();
86                 sock.close();
87                 looping=false;
88             }
89             catch(Exception JavaDoc ex) {
90                 if(log.isErrorEnabled()) log.error("exception=" + ex);
91                 ex.printStackTrace(); // +++ remove
92
continue;
93             }
94         }
95     }
96
97
98     /* ----------------------------------- Private methods ----------------------------------- */
99
100
101
102     void init() throws Exception JavaDoc {
103         if(bind_address == null) {
104             srv_sock=new ServerSocket JavaDoc(port, 20); // backlog of 20 connections
105
bind_address=srv_sock.getInetAddress();
106         }
107         else
108             srv_sock=new ServerSocket JavaDoc(port, 20, bind_address); // backlog of 20 connections
109
{
110             if(log.isInfoEnabled()) log.info("GossipServer was created at " + new Date());
111             if(log.isInfoEnabled()) log.info("Listening on port " + port + " bound on address " + bind_address);
112         }
113         cache_cleaner=new CacheCleaner();
114         timer.schedule(cache_cleaner, EXPIRY_TIME, EXPIRY_TIME);
115     }
116
117
118     /**
119      Process the gossip request. Return a gossip response or null if none.
120      */

121     GossipData processGossip(GossipData gossip) {
122         String JavaDoc group;
123         Address mbr;
124
125         if(gossip == null) return null;
126          if(log.isInfoEnabled()) log.info(gossip.toString());
127         switch(gossip.getType()) {
128             case GossipData.REGISTER_REQ:
129                 group=gossip.getGroup();
130                 mbr=gossip.getMbr();
131                 if(group == null || mbr == null) {
132                     if(log.isErrorEnabled()) log.error("group or member is null, cannot register member");
133                     return null;
134                 }
135                 return processRegisterRequest(group, mbr);
136
137             case GossipData.GET_REQ:
138                 group=gossip.getGroup();
139                 if(group == null) {
140                     if(log.isErrorEnabled()) log.error("group is null, cannot get membership");
141                     return null;
142                 }
143                 return processGetRequest(group);
144
145             case GossipData.GET_RSP: // should not be received
146
if(log.isWarnEnabled()) log.warn("received a GET_RSP. Should not be received by server");
147                 return null;
148
149             default:
150                 if(log.isWarnEnabled()) log.warn("received unkown gossip request (gossip=" + gossip + ')');
151                 return null;
152         }
153     }
154
155
156     GossipData processRegisterRequest(String JavaDoc group, Address mbr) {
157         addMember(group, mbr);
158         return null;
159     }
160
161
162     GossipData processGetRequest(String JavaDoc group) {
163         GossipData ret=null;
164         Vector mbrs=getMembers(group);
165
166         ret=new GossipData(GossipData.GET_RSP, group, null, mbrs);
167
168             if(log.isInfoEnabled()) log.info("members are " + mbrs +
169                                                            ", gossip_rsp=" + ret);
170         return ret;
171     }
172
173
174     /**
175      Adds a member to the list for the given group. If the group doesn't exist, it will be created. If the member
176      is already present, its timestamp will be updated. Otherwise the member will be added.
177      @param group The group name. Guaranteed to be non-null
178      @param mbr The member's address. Guaranteed to be non-null
179      */

180     void addMember(String JavaDoc group, Address mbr) {
181         Vector mbrs=(Vector) groups.get(group);
182         Entry entry;
183
184         if(mbrs == null) {
185             mbrs=new Vector();
186             mbrs.addElement(new Entry(mbr));
187             groups.put(group, mbrs);
188              if(log.isInfoEnabled()) log.info("added " + mbr + " to " + group + " (new group)");
189         }
190         else {
191             entry=findEntry(mbrs, mbr);
192             if(entry == null) {
193                 entry=new Entry(mbr);
194                 mbrs.addElement(entry);
195                  if(log.isInfoEnabled()) log.info("added " + mbr + " to " + group);
196             }
197             else {
198                 entry.update();
199                  if(log.isInfoEnabled()) log.info("updated entry " + entry);
200             }
201         }
202     }
203
204
205     Vector getMembers(String JavaDoc group) {
206         Vector ret=null;
207         Vector mbrs=(Vector) groups.get(group);
208
209         if(mbrs == null)
210             return null;
211         ret=new Vector();
212         for(int i=0; i < mbrs.size(); i++)
213             ret.addElement(((Entry) mbrs.elementAt(i)).mbr);
214         return ret;
215     }
216
217
218     Entry findEntry(Vector mbrs, Address mbr) {
219         Entry entry=null;
220
221         for(int i=0; i < mbrs.size(); i++) {
222             entry=(Entry) mbrs.elementAt(i);
223             if(entry.mbr != null && entry.mbr.equals(mbr))
224                 return entry;
225         }
226         return null;
227     }
228
229
230     /**
231      * Remove expired entries (entries older than EXPIRY_TIME msec).
232      */

233     void sweep() {
234         long current_time=System.currentTimeMillis(), diff;
235         int num_entries_removed=0;
236         String JavaDoc key=null;
237         Vector val;
238         Entry entry;
239
240         for(Enumeration e=groups.keys(); e.hasMoreElements();) {
241             key=(String JavaDoc) e.nextElement();
242             val=(Vector) groups.get(key);
243             if(val != null) {
244                 for(Iterator it=val.listIterator(); it.hasNext();) {
245                     entry=(Entry) it.next();
246                     diff=current_time - entry.timestamp;
247                     if(entry.timestamp + EXPIRY_TIME < current_time) {
248                         it.remove();
249
250                             if(log.isInfoEnabled()) log.info("removed member " + entry +
251                                                                " from group " + key + '(' + diff + " msecs old)");
252                         num_entries_removed++;
253                     }
254                 }
255             }
256         }
257
258         if(num_entries_removed > 0)
259             if(log.isInfoEnabled()) log.info("done (removed " + num_entries_removed + " entries)");
260     }
261
262
263     /* -------------------------------- End of Private methods ----------------------------------- */
264
265
266
267     /**
268      * Maintains the member address plus a timestamp. Used by CacheCleaner thread to remove old entries.
269      */

270     private static class Entry {
271         Address mbr=null;
272         long timestamp=0;
273
274         private Entry(Address mbr) {
275             this.mbr=mbr;
276             update();
277         }
278
279         void update() {
280             timestamp=System.currentTimeMillis();
281         }
282
283         public boolean equals(Object JavaDoc other) {
284             if(mbr != null && other != null && other instanceof Address)
285                 return mbr.equals(other);
286             return false;
287         }
288
289         public String JavaDoc toString() {
290             return "mbr=" + mbr;
291         }
292     }
293
294
295     /**
296      * Periodically sweeps the cache and removes old items (items that are older than EXPIRY_TIME msecs)
297      */

298     private class CacheCleaner extends TimerTask {
299
300         public void run() {
301             sweep();
302         }
303
304     }
305
306
307     public static void main(String JavaDoc[] args)
308             throws java.net.UnknownHostException JavaDoc {
309         String JavaDoc arg;
310         int port=7500;
311         long expiry_time=30000;
312         GossipServer gossip_server=null;
313         InetAddress JavaDoc address=null;
314         for(int i=0; i < args.length; i++) {
315             arg=args[i];
316             if("-help".equals(arg)) {
317                 System.out.println("GossipServer [-port <port>] [-expiry <msecs>] [-bindaddress <address>]");
318                 return;
319             }
320             if("-port".equals(arg)) {
321                 port=Integer.parseInt(args[++i]);
322                 continue;
323             }
324             if("-expiry".equals(arg)) {
325                 expiry_time=Long.parseLong(args[++i]);
326                 continue;
327             }
328             if("-bindaddress".equals(arg)) {
329                 address=InetAddress.getByName(args[++i]);
330                 continue;
331             }
332             System.out.println("GossipServer [-port <port>] [-expiry <msecs>]");
333             return;
334         }
335
336         try {
337
338         }
339         catch(Throwable JavaDoc ex) {
340             System.err.println("GossipServer.main(): " + ex);
341         }
342
343         try {
344             gossip_server=new GossipServer(port, expiry_time, address);
345             gossip_server.run();
346         }
347         catch(Exception JavaDoc e) {
348             System.err.println("GossipServer.main(): " + e);
349         }
350     }
351
352
353 }
354
Popular Tags