1 package org.jgroups.protocols; 2 3 import org.jgroups.Address; 4 import org.jgroups.Event; 5 import org.jgroups.Message; 6 import org.jgroups.View; 7 import org.jgroups.stack.Protocol; 8 import org.jgroups.util.BoundedList; 9 import org.jgroups.util.Util; 10 11 import java.net.InetAddress ; 12 import java.net.UnknownHostException ; 13 import java.util.Collection ; 14 import java.util.Properties ; 15 import java.util.Vector ; 16 17 21 public abstract class BasicTCP extends TP { 22 23 24 boolean skip_suspected_members=true; 25 26 27 boolean suspect_on_send_failure=false; 28 29 30 33 final BoundedList suspected_mbrs=new BoundedList(20); 34 protected InetAddress external_addr=null; protected int start_port=7800; protected int end_port=0; protected long reaper_interval=0; protected long conn_expire_time=0; 40 boolean use_send_queues=true; 41 int send_queue_size=10000; int recv_buf_size=150000; 43 int send_buf_size=150000; 44 int sock_conn_timeout=2000; boolean tcp_nodelay=false; 46 int linger=-1; 48 49 50 public int getStartPort() {return start_port;} 51 public void setStartPort(int start_port) {this.start_port=start_port;} 52 public int getEndPort() {return end_port;} 53 public void setEndPort(int end_port) {this.end_port=end_port;} 54 public long getReaperInterval() {return reaper_interval;} 55 public void setReaperInterval(long reaper_interval) {this.reaper_interval=reaper_interval;} 56 public long getConnExpireTime() {return conn_expire_time;} 57 public void setConnExpireTime(long conn_expire_time) {this.conn_expire_time=conn_expire_time;} 58 59 public boolean setProperties(Properties props) { 60 String str; 61 62 super.setProperties(props); 63 64 str=props.getProperty("start_port"); 65 if(str != null) { 66 start_port=Integer.parseInt(str); 67 props.remove("start_port"); 68 } 69 70 str=props.getProperty("end_port"); 71 if(str != null) { 72 end_port=Integer.parseInt(str); 73 props.remove("end_port"); 74 } 75 76 str=props.getProperty("external_addr"); 77 if(str != null) { 78 try { 79 external_addr=InetAddress.getByName(str); 80 } 81 catch(UnknownHostException unknown) { 82 if(log.isFatalEnabled()) log.fatal("(external_addr): host " + str + " not known"); 83 return false; 84 } 85 props.remove("external_addr"); 86 } 87 88 str=props.getProperty("reaper_interval"); 89 if(str != null) { 90 reaper_interval=Long.parseLong(str); 91 props.remove("reaper_interval"); 92 } 93 94 str=props.getProperty("conn_expire_time"); 95 if(str != null) { 96 conn_expire_time=Long.parseLong(str); 97 props.remove("conn_expire_time"); 98 } 99 100 str=props.getProperty("sock_conn_timeout"); 101 if(str != null) { 102 sock_conn_timeout=Integer.parseInt(str); 103 props.remove("sock_conn_timeout"); 104 } 105 106 str=props.getProperty("recv_buf_size"); 107 if(str != null) { 108 recv_buf_size=Integer.parseInt(str); 109 props.remove("recv_buf_size"); 110 } 111 112 str=props.getProperty("send_buf_size"); 113 if(str != null) { 114 send_buf_size=Integer.parseInt(str); 115 props.remove("send_buf_size"); 116 } 117 118 str=props.getProperty("skip_suspected_members"); 119 if(str != null) { 120 skip_suspected_members=Boolean.valueOf(str).booleanValue(); 121 props.remove("skip_suspected_members"); 122 } 123 124 str=props.getProperty("suspect_on_send_failure"); 125 if(str != null) { 126 suspect_on_send_failure=Boolean.valueOf(str).booleanValue(); 127 props.remove("suspect_on_send_failure"); 128 } 129 130 str=props.getProperty("use_send_queues"); 131 if(str != null) { 132 use_send_queues=Boolean.valueOf(str).booleanValue(); 133 props.remove("use_send_queues"); 134 } 135 136 str=props.getProperty("send_queue_size"); 137 if(str != null) { 138 send_queue_size=Integer.parseInt(str); 139 props.remove("send_queue_size"); 140 } 141 142 str=props.getProperty("tcp_nodelay"); 143 if(str != null) { 144 tcp_nodelay=Boolean.parseBoolean(str); 145 props.remove("tcp_nodelay"); 146 } 147 148 str=props.getProperty("linger"); 149 if(str != null) { 150 linger=Integer.parseInt(str); 151 props.remove("linger"); 152 } 153 154 155 Util.checkBufferSize(getName() + ".recv_buf_size", recv_buf_size); 156 Util.checkBufferSize(getName() + ".send_buf_size", send_buf_size); 157 158 return true; 159 } 160 161 public void init() throws Exception { 162 super.init(); 163 if(start_port <= 0) { 164 Protocol dynamic_discovery_prot=stack.findProtocol("MPING"); 165 if(dynamic_discovery_prot == null) 166 dynamic_discovery_prot=stack.findProtocol("TCPGOSSIP"); 167 168 if(dynamic_discovery_prot != null) { 169 if(log.isDebugEnabled()) 170 log.debug("dynamic discovery is present (" + dynamic_discovery_prot + "), so start_port=" + start_port + " is okay"); 171 } 172 else { 173 throw new IllegalArgumentException ("start_port cannot be set to " + start_port + 174 ", as no dynamic discovery protocol (e.g. MPING or TCPGOSSIP) has been detected."); 175 } 176 } 177 } 178 179 180 181 public void sendToAllMembers(byte[] data, int offset, int length) throws Exception { 182 Address dest; 183 Vector mbrs=(Vector )members.clone(); 184 for(int i=0; i < mbrs.size(); i++) { 185 dest=(Address)mbrs.elementAt(i); 186 sendToSingleMember(dest, data, offset, length); 187 } 188 } 189 190 public void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception { 191 if(log.isTraceEnabled()) log.trace("dest=" + dest + " (" + length + " bytes)"); 192 if(skip_suspected_members) { 193 if(suspected_mbrs.contains(dest)) { 194 if(log.isTraceEnabled()) 195 log.trace("will not send unicast message to " + dest + " as it is currently suspected"); 196 return; 197 } 198 } 199 200 try { 201 send(dest, data, offset, length); 202 } 203 catch(Exception e) { 204 if(log.isTraceEnabled()) 205 log.trace("failure sending message to " + dest, e); 206 if(suspect_on_send_failure && members.contains(dest)) { 207 if(!suspected_mbrs.contains(dest)) { 208 suspected_mbrs.add(dest); 209 up_prot.up(new Event(Event.SUSPECT, dest)); 210 } 211 } 212 } 213 } 214 215 public String getInfo() { 216 StringBuilder sb=new StringBuilder (); 217 sb.append("connections: ").append(printConnections()).append("\n"); 218 return sb.toString(); 219 } 220 221 public void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast) { 222 if(multicast) 223 msg.setDest(null); 224 else 225 msg.setDest(dest); 226 } 227 228 public void postUnmarshallingList(Message msg, Address dest, boolean multicast) { 229 postUnmarshalling(msg, dest, null, multicast); 230 } 231 232 public abstract String printConnections(); 233 234 public abstract void send(Address dest, byte[] data, int offset, int length) throws Exception ; 235 236 public abstract void retainAll(Collection members); 237 238 239 public void receive(Address sender, byte[] data, int offset, int length) { 240 receive(local_addr, sender, data, offset, length); 241 } 242 243 protected Object handleDownEvent(Event evt) { 244 Object ret=super.handleDownEvent(evt); 245 if(evt.getType() == Event.VIEW_CHANGE) { 246 suspected_mbrs.removeAll(); 247 View v=(View)evt.getArg(); 248 Vector tmp_mbrs=v != null? v.getMembers() : null; 249 if(tmp_mbrs != null) { 250 retainAll(tmp_mbrs); } 252 } 253 else if(evt.getType() == Event.UNSUSPECT) { 254 suspected_mbrs.removeElement(evt.getArg()); 255 } 256 return ret; 257 } 258 } 259 | Popular Tags |