1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.Address; 7 import org.jgroups.Event; 8 import org.jgroups.Message; 9 import org.jgroups.View; 10 import org.jgroups.blocks.ConnectionTable; 11 import org.jgroups.stack.IpAddress; 12 import org.jgroups.stack.Protocol; 13 import org.jgroups.util.BoundedList; 14 import org.jgroups.util.Util; 15 16 import java.net.InetAddress ; 17 import java.net.SocketException ; 18 import java.net.UnknownHostException ; 19 import java.util.HashMap ; 20 import java.util.Properties ; 21 import java.util.Vector ; 22 23 24 25 26 37 public class TCP extends Protocol implements ConnectionTable.Receiver { 38 private ConnectionTable ct=null; 39 protected Address local_addr=null; 40 private String group_addr=null; 41 private InetAddress bind_addr=null; private InetAddress external_addr=null; private int start_port=7800; private int end_port=0; private final Vector members=new Vector (11); 46 private long reaper_interval=0; private long conn_expire_time=0; boolean loopback=false; 50 52 byte[] additional_data=null; 53 54 57 final BoundedList suspected_mbrs=new BoundedList(20); 58 59 60 boolean skip_suspected_members=true; 61 62 int recv_buf_size=150000; 63 int send_buf_size=150000; 64 int sock_conn_timeout=2000; 66 static final String name="TCP"; 67 static final String IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address"; 68 69 70 public TCP() { 71 } 72 73 public String toString() { 74 return "Protocol TCP(local address: " + local_addr + ')'; 75 } 76 77 public String getName() { 78 return "TCP"; 79 } 80 81 82 protected final Vector getMembers() { 83 return members; 84 } 85 86 90 public void startUpHandler() { 91 ; 92 } 93 94 95 public void start() throws Exception { 96 ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port); 97 ct.setReceiveBufferSize(recv_buf_size); 99 ct.setSendBufferSize(send_buf_size); 100 ct.setSocketConnectionTimeout(sock_conn_timeout); 101 local_addr=ct.getLocalAddress(); 102 if(additional_data != null && local_addr instanceof IpAddress) 103 ((IpAddress)local_addr).setAdditionalData(additional_data); 104 passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 105 } 106 107 117 protected ConnectionTable getConnectionTable(long reaperInterval, long connExpireTime, InetAddress bindAddress, 118 InetAddress externalAddress, int startPort, int endPort) throws Exception { 119 ConnectionTable cTable=null; 120 if(reaperInterval == 0 && connExpireTime == 0) { 121 cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort); 122 } 123 else { 124 if(reaperInterval == 0) { 125 reaperInterval=5000; 126 if(log.isWarnEnabled()) log.warn("reaper_interval was 0, set it to " + reaperInterval); 127 } 128 if(connExpireTime == 0) { 129 connExpireTime=1000 * 60 * 5; 130 if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + connExpireTime); 131 } 132 cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort, 133 reaperInterval, connExpireTime); 134 } 135 return cTable; 136 } 137 138 public void stop() { 139 ct.stop(); 140 } 141 142 143 146 public void down(Event evt) { 147 Message msg; 148 Object dest_addr; 149 150 if(evt.getType() != Event.MSG) { 151 handleDownEvent(evt); 152 return; 153 } 154 155 msg=(Message)evt.getArg(); 156 157 if(group_addr != null) { 159 msg.putHeader(name, new TcpHeader(group_addr)); 160 } 161 162 dest_addr=msg.getDest(); 163 164 165 167 if(observer != null) 168 observer.passDown(evt); 169 170 if(dest_addr == null) { if(group_addr == null) { 172 if(log.isWarnEnabled()) log.warn("dest address of message is null, and " + 173 "sending to default address fails as group_addr is null, too !" + 174 " Discarding message."); 175 return; 176 } 177 else { 178 sendMulticastMessage(msg); } 180 } 181 else { 182 sendUnicastMessage(msg); } 184 } 185 186 187 188 public void receive(Message msg) { 189 TcpHeader hdr=null; 190 Event evt=new Event(Event.MSG, msg); 191 192 193 195 if(observer != null) 196 observer.up(evt, up_queue.size()); 197 198 if(log.isTraceEnabled()) log.trace("received msg " + msg); 199 200 hdr=(TcpHeader)msg.removeHeader(name); 201 202 if(hdr != null) { 203 204 String ch_name=null; 205 206 if(hdr.group_addr != null) 207 ch_name=hdr.group_addr; 208 209 211 216 if(ch_name != null && !group_addr.equals(ch_name) && 219 !ch_name.equals(Util.DIAG_GROUP)) { 220 if(log.isWarnEnabled()) log.warn("discarded message from different group (" + 221 ch_name + "). Sender was " + msg.getSrc()); 222 return; 223 } 224 } 225 226 passUp(evt); 227 } 228 229 230 240 241 242 public boolean setProperties(Properties props) { 243 String str, tmp=null; 244 245 super.setProperties(props); 246 str=props.getProperty("start_port"); 247 if(str != null) { 248 start_port=Integer.parseInt(str); 249 props.remove("start_port"); 250 } 251 252 str=props.getProperty("end_port"); 253 if(str != null) { 254 end_port=Integer.parseInt(str); 255 props.remove("end_port"); 256 } 257 258 try { 260 tmp=System.getProperty("bind.address"); if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) { 262 tmp=null; 263 } 264 } 265 catch (SecurityException ex){ 266 } 267 268 if(tmp != null) 269 str=tmp; 270 else 271 str=props.getProperty("bind_addr"); 272 if(str != null) { 273 try { 274 bind_addr=InetAddress.getByName(str); 275 } 276 catch(UnknownHostException unknown) { 277 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known"); 278 return false; 279 } 280 props.remove("bind_addr"); 281 } 282 283 str=props.getProperty("external_addr"); 284 if(str != null) { 285 try { 286 external_addr=InetAddress.getByName(str); 287 } 288 catch(UnknownHostException unknown) { 289 if(log.isFatalEnabled()) log.fatal("(external_addr): host " + str + " not known"); 290 return false; 291 } 292 props.remove("external_addr"); 293 } 294 295 str=props.getProperty("reaper_interval"); 296 if(str != null) { 297 reaper_interval=Long.parseLong(str); 298 props.remove("reaper_interval"); 299 } 300 301 str=props.getProperty("conn_expire_time"); 302 if(str != null) { 303 conn_expire_time=Long.parseLong(str); 304 props.remove("conn_expire_time"); 305 } 306 307 str=props.getProperty("sock_conn_timeout"); 308 if(str != null) { 309 sock_conn_timeout=Integer.parseInt(str); 310 props.remove("sock_conn_timeout"); 311 } 312 313 str=props.getProperty("recv_buf_size"); 314 if(str != null) { 315 recv_buf_size=Integer.parseInt(str); 316 props.remove("recv_buf_size"); 317 } 318 319 str=props.getProperty("send_buf_size"); 320 if(str != null) { 321 send_buf_size=Integer.parseInt(str); 322 props.remove("send_buf_size"); 323 } 324 325 str=props.getProperty("loopback"); 326 if(str != null) { 327 loopback=Boolean.valueOf(str).booleanValue(); 328 props.remove("loopback"); 329 } 330 331 str=props.getProperty("skip_suspected_members"); 332 if(str != null) { 333 skip_suspected_members=Boolean.valueOf(str).booleanValue(); 334 props.remove("skip_suspected_members"); 335 } 336 337 if(props.size() > 0) { 338 System.err.println("TCP.setProperties(): the following properties are not recognized:"); 339 props.list(System.out); 340 return false; 341 } 342 return true; 343 } 344 345 346 352 private void setSourceAddress(Message msg) { 353 if(msg.getSrc() == null) 354 msg.setSrc(local_addr); 355 } 356 357 358 359 private void sendUnicastMessage(Message msg) { 360 IpAddress dest; 361 Message copy; 362 Object hdr; 363 Event evt; 364 365 dest=(IpAddress)msg.getDest(); if(!(dest instanceof IpAddress)) { 367 if(log.isErrorEnabled()) log.error("destination address is not of type IpAddress !"); 368 return; 369 } 370 setSourceAddress(msg); 371 372 373 if(loopback && local_addr != null && dest != null && dest.equals(local_addr)) { 374 copy=msg.copy(); 375 hdr=copy.getHeader(name); 376 if(hdr != null && hdr instanceof TcpHeader) 377 copy.removeHeader(name); 378 copy.setSrc(local_addr); 379 copy.setDest(local_addr); 380 381 evt=new Event(Event.MSG, copy); 382 383 385 if(observer != null) 386 observer.up(evt, up_queue.size()); 387 388 passUp(evt); 389 return; 390 } 391 if(log.isTraceEnabled()) log.trace("dest=" + msg.getDest() + ", hdrs:\n" + msg.printObjectHeaders()); 392 try { 393 if(skip_suspected_members) { 394 if(suspected_mbrs.contains(dest)) { 395 if(log.isTraceEnabled()) log.trace("will not send unicast message to " + dest + 396 " as it is currently suspected"); 397 return; 398 } 399 } 400 ct.send(msg); 401 } 402 catch(SocketException e) { 403 if(members.contains(dest)) { 404 if(!suspected_mbrs.contains(dest)) { 405 suspected_mbrs.add(dest); 406 passUp(new Event(Event.SUSPECT, dest)); 407 } 408 } 409 } 410 } 411 412 413 protected void sendMulticastMessage(Message msg) { 414 Address dest; 415 Vector mbrs=(Vector )members.clone(); 416 for(int i=0; i < mbrs.size(); i++) { 417 dest=(Address)mbrs.elementAt(i); 418 msg.setDest(dest); 419 sendUnicastMessage(msg); 420 } 421 } 422 423 424 protected void handleDownEvent(Event evt) { 425 switch(evt.getType()) { 426 427 case Event.TMP_VIEW: 428 case Event.VIEW_CHANGE: 429 suspected_mbrs.removeAll(); 430 synchronized(members) { 431 members.clear(); 432 members.addAll(((View)evt.getArg()).getMembers()); 433 } 434 break; 435 436 case Event.GET_LOCAL_ADDRESS: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 438 break; 439 440 case Event.CONNECT: 441 group_addr=(String )evt.getArg(); 442 443 passUp(new Event(Event.CONNECT_OK)); 446 break; 447 448 case Event.DISCONNECT: 449 passUp(new Event(Event.DISCONNECT_OK)); 450 break; 451 452 case Event.CONFIG: 453 if(log.isTraceEnabled()) log.trace("received CONFIG event: " + evt.getArg()); 454 handleConfigEvent((HashMap )evt.getArg()); 455 break; 456 } 457 } 458 459 460 void handleConfigEvent(HashMap map) { 461 if(map == null) return; 462 if(map.containsKey("additional_data")) 463 additional_data=(byte[])map.get("additional_data"); 464 } 465 466 467 } 468 | Popular Tags |