1 3 package org.jgroups.stack; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.Address; 8 9 import java.io.ObjectInputStream ; 10 import java.io.ObjectOutputStream ; 11 import java.net.InetAddress ; 12 import java.net.ServerSocket ; 13 import java.net.Socket ; 14 import java.util.*; 15 16 17 32 public class GossipServer { 33 final Hashtable groups=new Hashtable(); int port=7500; 35 ServerSocket srv_sock=null; 36 long EXPIRY_TIME=30000; CacheCleaner cache_cleaner=null; final Timer timer=new Timer(true); InetAddress bind_address=null; 40 protected final Log log=LogFactory.getLog(getClass()); 41 42 43 public GossipServer(int port) throws Exception { 44 this.port=port; 45 init(); 46 } 47 48 49 public GossipServer(int port, long expiry_time) throws Exception { 50 this.port=port; 51 EXPIRY_TIME=expiry_time; 52 init(); 53 } 54 55 public GossipServer(int port, long expiry_time, InetAddress bind_address) throws Exception { 56 this.port=port; 57 this.bind_address=bind_address; 58 EXPIRY_TIME=expiry_time; 59 init(); 60 } 61 62 63 public void run() { 64 Socket sock; 65 ObjectInputStream input; 66 ObjectOutputStream output=null; 67 GossipData gossip_req, gossip_rsp; 68 boolean looping=true; 69 70 while(looping) { 71 try { 72 sock=srv_sock.accept(); 73 if(log.isInfoEnabled()) log.info("accepted connection from " + sock.getInetAddress() + 74 ':' + sock.getPort()); 75 sock.setSoLinger(true, 500); 76 input=new ObjectInputStream (sock.getInputStream()); 77 gossip_req=(GossipData) input.readObject(); 78 gossip_rsp=processGossip(gossip_req); 79 if(gossip_rsp != null) { 80 output=new ObjectOutputStream (sock.getOutputStream()); 81 output.writeObject(gossip_rsp); 82 output.flush(); 83 output.close(); 84 } 85 input.close(); 86 sock.close(); 87 looping=false; 88 } 89 catch(Exception ex) { 90 if(log.isErrorEnabled()) log.error("exception=" + ex); 91 ex.printStackTrace(); continue; 93 } 94 } 95 } 96 97 98 99 100 101 102 void init() throws Exception { 103 if(bind_address == null) { 104 srv_sock=new ServerSocket (port, 20); bind_address=srv_sock.getInetAddress(); 106 } 107 else 108 srv_sock=new ServerSocket (port, 20, bind_address); { 110 if(log.isInfoEnabled()) log.info("GossipServer was created at " + new Date()); 111 if(log.isInfoEnabled()) log.info("Listening on port " + port + " bound on address " + bind_address); 112 } 113 cache_cleaner=new CacheCleaner(); 114 timer.schedule(cache_cleaner, EXPIRY_TIME, EXPIRY_TIME); 115 } 116 117 118 121 GossipData processGossip(GossipData gossip) { 122 String group; 123 Address mbr; 124 125 if(gossip == null) return null; 126 if(log.isInfoEnabled()) log.info(gossip.toString()); 127 switch(gossip.getType()) { 128 case GossipData.REGISTER_REQ: 129 group=gossip.getGroup(); 130 mbr=gossip.getMbr(); 131 if(group == null || mbr == null) { 132 if(log.isErrorEnabled()) log.error("group or member is null, cannot register member"); 133 return null; 134 } 135 return processRegisterRequest(group, mbr); 136 137 case GossipData.GET_REQ: 138 group=gossip.getGroup(); 139 if(group == null) { 140 if(log.isErrorEnabled()) log.error("group is null, cannot get membership"); 141 return null; 142 } 143 return processGetRequest(group); 144 145 case GossipData.GET_RSP: if(log.isWarnEnabled()) log.warn("received a GET_RSP. Should not be received by server"); 147 return null; 148 149 default: 150 if(log.isWarnEnabled()) log.warn("received unkown gossip request (gossip=" + gossip + ')'); 151 return null; 152 } 153 } 154 155 156 GossipData processRegisterRequest(String group, Address mbr) { 157 addMember(group, mbr); 158 return null; 159 } 160 161 162 GossipData processGetRequest(String group) { 163 GossipData ret=null; 164 Vector mbrs=getMembers(group); 165 166 ret=new GossipData(GossipData.GET_RSP, group, null, mbrs); 167 168 if(log.isInfoEnabled()) log.info("members are " + mbrs + 169 ", gossip_rsp=" + ret); 170 return ret; 171 } 172 173 174 180 void addMember(String group, Address mbr) { 181 Vector mbrs=(Vector) groups.get(group); 182 Entry entry; 183 184 if(mbrs == null) { 185 mbrs=new Vector(); 186 mbrs.addElement(new Entry(mbr)); 187 groups.put(group, mbrs); 188 if(log.isInfoEnabled()) log.info("added " + mbr + " to " + group + " (new group)"); 189 } 190 else { 191 entry=findEntry(mbrs, mbr); 192 if(entry == null) { 193 entry=new Entry(mbr); 194 mbrs.addElement(entry); 195 if(log.isInfoEnabled()) log.info("added " + mbr + " to " + group); 196 } 197 else { 198 entry.update(); 199 if(log.isInfoEnabled()) log.info("updated entry " + entry); 200 } 201 } 202 } 203 204 205 Vector getMembers(String group) { 206 Vector ret=null; 207 Vector mbrs=(Vector) groups.get(group); 208 209 if(mbrs == null) 210 return null; 211 ret=new Vector(); 212 for(int i=0; i < mbrs.size(); i++) 213 ret.addElement(((Entry) mbrs.elementAt(i)).mbr); 214 return ret; 215 } 216 217 218 Entry findEntry(Vector mbrs, Address mbr) { 219 Entry entry=null; 220 221 for(int i=0; i < mbrs.size(); i++) { 222 entry=(Entry) mbrs.elementAt(i); 223 if(entry.mbr != null && entry.mbr.equals(mbr)) 224 return entry; 225 } 226 return null; 227 } 228 229 230 233 void sweep() { 234 long current_time=System.currentTimeMillis(), diff; 235 int num_entries_removed=0; 236 String key=null; 237 Vector val; 238 Entry entry; 239 240 for(Enumeration e=groups.keys(); e.hasMoreElements();) { 241 key=(String ) e.nextElement(); 242 val=(Vector) groups.get(key); 243 if(val != null) { 244 for(Iterator it=val.listIterator(); it.hasNext();) { 245 entry=(Entry) it.next(); 246 diff=current_time - entry.timestamp; 247 if(entry.timestamp + EXPIRY_TIME < current_time) { 248 it.remove(); 249 250 if(log.isInfoEnabled()) log.info("removed member " + entry + 251 " from group " + key + '(' + diff + " msecs old)"); 252 num_entries_removed++; 253 } 254 } 255 } 256 } 257 258 if(num_entries_removed > 0) 259 if(log.isInfoEnabled()) log.info("done (removed " + num_entries_removed + " entries)"); 260 } 261 262 263 264 265 266 267 270 private static class Entry { 271 Address mbr=null; 272 long timestamp=0; 273 274 private Entry(Address mbr) { 275 this.mbr=mbr; 276 update(); 277 } 278 279 void update() { 280 timestamp=System.currentTimeMillis(); 281 } 282 283 public boolean equals(Object other) { 284 if(mbr != null && other != null && other instanceof Address) 285 return mbr.equals(other); 286 return false; 287 } 288 289 public String toString() { 290 return "mbr=" + mbr; 291 } 292 } 293 294 295 298 private class CacheCleaner extends TimerTask { 299 300 public void run() { 301 sweep(); 302 } 303 304 } 305 306 307 public static void main(String [] args) 308 throws java.net.UnknownHostException { 309 String arg; 310 int port=7500; 311 long expiry_time=30000; 312 GossipServer gossip_server=null; 313 InetAddress address=null; 314 for(int i=0; i < args.length; i++) { 315 arg=args[i]; 316 if("-help".equals(arg)) { 317 System.out.println("GossipServer [-port <port>] [-expiry <msecs>] [-bindaddress <address>]"); 318 return; 319 } 320 if("-port".equals(arg)) { 321 port=Integer.parseInt(args[++i]); 322 continue; 323 } 324 if("-expiry".equals(arg)) { 325 expiry_time=Long.parseLong(args[++i]); 326 continue; 327 } 328 if("-bindaddress".equals(arg)) { 329 address=InetAddress.getByName(args[++i]); 330 continue; 331 } 332 System.out.println("GossipServer [-port <port>] [-expiry <msecs>]"); 333 return; 334 } 335 336 try { 337 338 } 339 catch(Throwable ex) { 340 System.err.println("GossipServer.main(): " + ex); 341 } 342 343 try { 344 gossip_server=new GossipServer(port, expiry_time, address); 345 gossip_server.run(); 346 } 347 catch(Exception e) { 348 System.err.println("GossipServer.main(): " + e); 349 } 350 } 351 352 353 } 354 | Popular Tags |