1 3 package org.jgroups.protocols; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Message; 8 import org.jgroups.stack.GossipClient; 9 import org.jgroups.stack.IpAddress; 10 11 import java.util.Properties ; 12 import java.util.StringTokenizer ; 13 import java.util.Vector ; 14 15 16 28 public class TCPGOSSIP extends Discovery { 29 Vector initial_hosts=null; GossipClient gossip_client=null; 32 long gossip_refresh_rate=20000; 35 36 final static Vector EMPTY_VECTOR=new Vector (); 37 final static String name="TCPGOSSIP"; 38 39 40 public String getName() { 41 return name; 42 } 43 44 45 46 public boolean setProperties(Properties props) { 47 String str; 48 str=props.getProperty("gossip_refresh_rate"); if(str != null) { 50 gossip_refresh_rate=Integer.parseInt(str); 51 props.remove("gossip_refresh_rate"); 52 } 53 54 str=props.getProperty("initial_hosts"); 55 if(str != null) { 56 props.remove("initial_hosts"); 57 initial_hosts=createInitialHosts(str); 58 } 59 60 if(initial_hosts == null || initial_hosts.size() == 0) { 61 if(log.isErrorEnabled()) log.error("initial_hosts must contain the address of at least one GossipServer"); 62 return false; 63 } 64 return super.setProperties(props); 65 } 66 67 68 69 public void start() throws Exception { 70 super.start(); 71 if(gossip_client == null) 72 gossip_client=new GossipClient(initial_hosts, gossip_refresh_rate); 73 } 74 75 public void stop() { 76 super.stop(); 77 if(gossip_client != null) { 78 gossip_client.stop(); 79 gossip_client=null; 80 } 81 } 82 83 84 public void handleConnectOK() { 85 if(group_addr == null || local_addr == null) { 86 if(log.isErrorEnabled()) 87 log.error("[CONNECT_OK]: group_addr or local_addr is null. " + 88 "cannot register with GossipServer(s)"); 89 } 90 else { 91 if(log.isTraceEnabled()) 92 log.trace("[CONNECT_OK]: registering " + local_addr + 93 " under " + group_addr + " with GossipServer"); 94 gossip_client.register(group_addr, local_addr); 95 } 96 } 97 98 public void sendGetMembersRequest() { 99 Message msg, copy; 100 PingHeader hdr; 101 Vector tmp_mbrs; 102 Address mbr_addr; 103 104 if(group_addr == null) { 105 if(log.isErrorEnabled()) log.error("[FIND_INITIAL_MBRS]: group_addr is null, cannot get mbrship"); 106 passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR)); 107 return; 108 } 109 if(log.isTraceEnabled()) log.trace("fetching members from GossipServer(s)"); 110 tmp_mbrs=gossip_client.getMembers(group_addr); 111 if(tmp_mbrs == null || tmp_mbrs.size() == 0) { 112 if(log.isErrorEnabled()) log.error("[FIND_INITIAL_MBRS]: gossip client found no members"); 113 passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR)); 114 } 115 if(log.isTraceEnabled()) log.trace("consolidated mbrs from GossipServer(s) are " + tmp_mbrs); 116 117 hdr=new PingHeader(PingHeader.GET_MBRS_REQ, null); 119 msg=new Message(null, null, null); 120 msg.putHeader(name, hdr); 121 122 for(int i=0; i < tmp_mbrs.size(); i++) { 123 mbr_addr=(IpAddress)tmp_mbrs.elementAt(i); 124 copy=msg.copy(); 125 copy.setDest(mbr_addr); 126 if(log.isTraceEnabled()) log.trace("[FIND_INITIAL_MBRS] sending PING request to " + copy.getDest()); 127 passDown(new Event(Event.MSG, copy)); 128 } 129 } 130 131 132 133 134 135 136 139 private Vector createInitialHosts(String l) { 140 Vector tmp=new Vector (); 141 String host; 142 int port; 143 IpAddress addr; 144 StringTokenizer tok=new StringTokenizer (l, ","); 145 String t; 146 147 while(tok.hasMoreTokens()) { 148 try { 149 t=tok.nextToken(); 150 host=t.substring(0, t.indexOf('[')); 151 port=Integer.parseInt(t.substring(t.indexOf('[') + 1, t.indexOf(']'))); 152 addr=new IpAddress(host, port); 153 tmp.addElement(addr); 154 } 155 catch(NumberFormatException e) { 156 if(log.isErrorEnabled()) log.error("exeption is " + e); 157 } 158 } 159 160 return tmp; 161 } 162 163 164 } 165 166 | Popular Tags |