1 3 package org.jgroups.stack; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Address; 9 10 import java.io.ObjectInputStream ; 11 import java.io.ObjectOutputStream ; 12 import java.net.InetAddress ; 13 import java.net.Socket ; 14 import java.util.*; 15 16 17 24 public class GossipClient { 25 Timer timer=new Timer(); 26 final Hashtable groups=new Hashtable(); Refresher refresher_task=new Refresher(); 28 final Vector gossip_servers=new Vector(); boolean timer_running=false; 30 long EXPIRY_TIME=20000; 32 protected final Log log=LogFactory.getLog(this.getClass()); 33 34 35 40 public GossipClient(IpAddress gossip_host, long expiry) { 41 init(gossip_host, expiry); 42 } 43 44 45 50 public GossipClient(Vector gossip_hosts, long expiry) { 51 if(gossip_hosts == null) { 52 if(log.isErrorEnabled()) log.error("empty set of GossipServers given"); 53 return; 54 } 55 for(int i=0; i < gossip_hosts.size(); i++) 56 init((IpAddress) gossip_hosts.elementAt(i), expiry); 57 } 58 59 60 public void stop() { 61 timer_running=false; 62 timer.cancel(); 63 groups.clear(); 64 timer=new Timer(); 66 refresher_task=new Refresher(); 67 68 } 69 70 71 74 public void addGossipServer(IpAddress gossip_host) { 75 if(!gossip_servers.contains(gossip_host)) 76 gossip_servers.addElement(gossip_host); 77 } 78 79 80 85 public void register(String group, Address mbr) { 86 Vector mbrs; 87 88 if(group == null || mbr == null) { 89 if(log.isErrorEnabled()) log.error("group or mbr is null"); 90 return; 91 } 92 mbrs=(Vector) groups.get(group); 93 if(mbrs == null) { 94 mbrs=new Vector(); 95 mbrs.addElement(mbr); 96 groups.put(group, mbrs); 97 } 98 else { 99 if(!mbrs.contains(mbr)) 100 mbrs.addElement(mbr); 101 } 102 103 _register(group, mbr); 105 if(!timer_running) { 106 timer.schedule(refresher_task, EXPIRY_TIME, EXPIRY_TIME); 107 timer_running=true; 108 } 109 } 110 111 112 117 public Vector getMembers(String group) { 118 if(group == null) { 119 if(log.isErrorEnabled()) log.error("group is null"); 120 return null; 121 } 122 123 return _getMembers(group); 124 } 125 126 127 128 129 130 131 void init(IpAddress gossip_host, long expiry) { 132 EXPIRY_TIME=expiry; 133 addGossipServer(gossip_host); 134 } 135 136 137 141 void _register(String group, Address mbr) { 142 Socket sock; 143 ObjectOutputStream out; 144 IpAddress entry; 145 GossipData gossip_req; 146 147 for(int i=0; i < gossip_servers.size(); i++) { 148 entry=(IpAddress) gossip_servers.elementAt(i); 149 if(entry.getIpAddress() == null || entry.getPort() == 0) { 150 if(log.isErrorEnabled()) log.error("entry.host or entry.port is null"); 151 continue; 152 } 153 try { 154 if(log.isTraceEnabled()) 155 log.trace("REGISTER_REQ --> " + entry.getIpAddress() + ':' + entry.getPort()); 156 sock=new Socket (entry.getIpAddress(), entry.getPort()); 157 out=new ObjectOutputStream (sock.getOutputStream()); 158 gossip_req=new GossipData(GossipData.REGISTER_REQ, group, mbr, null); 159 out.writeObject(gossip_req); 162 out.flush(); 163 sock.close(); 164 } 165 catch(Exception ex) { 166 if(log.isErrorEnabled()) log.error("exception connecting to host " + entry + ": " + ex); 167 } 168 } 169 } 170 171 174 Vector _getMembers(String group) { 175 Vector ret=new Vector(); 176 Socket sock; 177 ObjectOutputStream out; 178 ObjectInputStream in; 179 IpAddress entry; 180 GossipData gossip_req, gossip_rsp; 181 Address mbr; 182 183 for(int i=0; i < gossip_servers.size(); i++) { 184 entry=(IpAddress) gossip_servers.elementAt(i); 185 if(entry.getIpAddress() == null || entry.getPort() == 0) { 186 if(log.isErrorEnabled()) log.error("entry.host or entry.port is null"); 187 continue; 188 } 189 try { 190 191 if(log.isTraceEnabled()) log.trace("GET_REQ --> " + entry.getIpAddress() + ':' + entry.getPort()); 192 sock=new Socket (entry.getIpAddress(), entry.getPort()); 193 out=new ObjectOutputStream (sock.getOutputStream()); 194 195 gossip_req=new GossipData(GossipData.GET_REQ, group, null, null); 196 out.writeObject(gossip_req); 199 out.flush(); 200 201 in=new ObjectInputStream (sock.getInputStream()); 202 gossip_rsp=(GossipData) in.readObject(); 203 if(gossip_rsp.mbrs != null) { for(int j=0; j < gossip_rsp.mbrs.size(); j++) { 205 mbr=(Address) gossip_rsp.mbrs.elementAt(j); 206 if(!ret.contains(mbr)) 207 ret.addElement(mbr); 208 } 209 } 210 211 212 sock.close(); 213 } 214 catch(Exception ex) { 215 if(log.isErrorEnabled()) log.error("exception connecting to host " + entry + ": " + ex); 216 } 217 } 218 219 return ret; 220 } 221 222 223 224 225 226 229 private class Refresher extends TimerTask { 230 231 public void run() { 232 int num_items=0; 233 String group; 234 Vector mbrs; 235 Address mbr; 236 237 if(log.isTraceEnabled()) log.trace("refresher task is run"); 238 for(Enumeration e=groups.keys(); e.hasMoreElements();) { 239 group=(String ) e.nextElement(); 240 mbrs=(Vector) groups.get(group); 241 if(mbrs != null) { 242 for(int i=0; i < mbrs.size(); i++) { 243 mbr=(Address) mbrs.elementAt(i); 244 if(log.isTraceEnabled()) log.trace("registering " + group + " : " + mbr); 245 register(group, mbr); 246 num_items++; 247 } 248 } 249 } 250 if(log.isTraceEnabled()) log.trace("refresher task done. Registered " + num_items + " items"); 251 } 252 253 } 254 255 256 public static void main(String [] args) { 257 Vector gossip_hosts=new Vector(); 258 String host; 259 InetAddress ip_addr; 260 int port; 261 boolean get=false, register=false, keep_running=false; 262 String register_host=null; 263 int register_port=0; 264 String get_group=null, register_group=null; 265 GossipClient gossip_client=null; 266 Vector mbrs; 267 long expiry=20000; 268 269 270 for(int i=0; i < args.length; i++) { 271 if("-help".equals(args[i])) { 272 usage(); 273 return; 274 } 275 if("-expiry".equals(args[i])) { 276 expiry=Long.parseLong(args[++i]); 277 continue; 278 } 279 if("-host".equals(args[i])) { 280 host=args[++i]; 281 port=Integer.parseInt(args[++i]); 282 try { 283 ip_addr=InetAddress.getByName(host); 284 gossip_hosts.addElement(new IpAddress(ip_addr, port)); 285 } 286 catch(Exception ex) { 287 System.err.println(ex); 288 } 289 continue; 290 } 291 if("-keep_running".equals(args[i])) { 292 keep_running=true; 293 continue; 294 } 295 if("-get".equals(args[i])) { 296 get=true; 297 get_group=args[++i]; 298 continue; 299 } 300 if("-register".equals(args[i])) { 301 register_group=args[++i]; 302 register_host=args[++i]; 303 register_port=Integer.parseInt(args[++i]); 304 register=true; 305 continue; 306 } 307 usage(); 308 return; 309 } 310 311 if(gossip_hosts.size() == 0) { 312 System.err.println("At least 1 GossipServer has to be given"); 313 return; 314 } 315 316 if(!register && !get) { 317 System.err.println("Neither get nor register command given, will not do anything"); 318 return; 319 } 320 321 try { 322 323 } 324 catch(Throwable ex) { 325 System.err.println("GossipClient.main(): error initailizing JGroups Trace: " + ex); 326 } 327 328 try { 329 gossip_client=new GossipClient(gossip_hosts, expiry); 330 if(register) { 331 System.out.println("Registering " + register_group + " --> " + register_host + ':' + register_port); 332 gossip_client.register(register_group, new IpAddress(register_host, register_port)); 333 } 334 335 if(get) { 336 System.out.println("Getting members for group " + get_group); 337 mbrs=gossip_client.getMembers(get_group); 338 System.out.println("Members for group " + get_group + " are " + mbrs); 339 } 340 } 341 catch(Exception ex) { 342 System.err.println(ex); 343 } 344 if(!keep_running) 345 gossip_client.stop(); 346 } 347 348 349 static void usage() { 350 System.out.println("GossipClient [-help] [-host <hostname> <port>]+ " + 351 " [-get <groupname>] [-register <groupname hostname port>] [-expiry <msecs>] " + 352 "[-keep_running]]"); 353 } 354 355 } 356 | Popular Tags |