1 3 package org.jgroups.protocols; 4 5 6 import org.jgroups.*; 7 import org.jgroups.blocks.LogicalLink; 8 import org.jgroups.stack.Protocol; 9 import org.jgroups.util.List; 10 import org.jgroups.util.Util; 11 12 import java.io.IOException ; 13 import java.io.ObjectInput ; 14 import java.io.ObjectOutput ; 15 import java.net.InetAddress ; 16 import java.util.Enumeration ; 17 import java.util.Properties ; 18 import java.util.StringTokenizer ; 19 import java.util.Vector ; 20 21 22 23 30 public class WANPIPE extends Protocol implements LogicalLink.Receiver { 31 LogicalLink pipe=null; 32 String name=null; final List links=new List(); 35 Address local_addr=null; 36 String group_addr=null; 37 final Properties properties=null; 38 final Vector members=new Vector (); 39 40 41 42 public WANPIPE() { 43 pipe=new LogicalLink(this); 44 } 45 46 47 public String toString() { 48 return "Protocol WANPIPE(local address: " + local_addr + ')'; 49 } 50 51 52 public String getName() {return "WANPIPE";} 53 54 55 56 57 58 59 62 public void down(Event evt) { 63 Message msg, rsp, copy; 64 Address dest_addr; 65 66 if(evt.getType() != Event.MSG) { 67 handleDownEvent(evt); 68 return; 69 } 70 71 msg=(Message)evt.getArg(); 72 dest_addr=msg.getDest(); 73 74 if(dest_addr == null) { for(int i=0; i < members.size(); i++) { 76 dest_addr=(Address )members.elementAt(i); 77 78 if(dest_addr.equals(local_addr)) { returnLocal(msg); 80 } 81 else { copy=msg.copy(); 83 copy.setDest(dest_addr); 84 copy.putHeader(getName(), new WanPipeHeader(group_addr)); 85 sendUnicastMessage(copy); 86 } 87 } 88 } 89 else { 90 if(dest_addr.equals(local_addr)) { returnLocal(msg); 92 } 93 else { msg.putHeader(getName(), new WanPipeHeader(group_addr)); 95 sendUnicastMessage(msg); 96 } 97 } 98 } 99 100 101 102 void returnLocal(Message msg) { 103 Message rsp=msg.copy(); 104 rsp.setDest(local_addr); 105 rsp.setSrc(local_addr); 106 passUp(new Event(Event.MSG, rsp)); 107 } 108 109 110 111 112 public void start() throws Exception { 113 LinkInfo l; 114 115 for(Enumeration e=links.elements(); e.hasMoreElements();) { 116 l=(LinkInfo)e.nextElement(); 117 pipe.addLink(l.local_addr, l.local_port, l.remote_addr, l.remote_port); 118 } 119 pipe.start(); 120 local_addr=new WanPipeAddress(name); passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 122 } 123 124 125 public void stop() { 126 pipe.stop(); 127 pipe.removeAllLinks(); 128 } 129 130 131 132 133 134 135 136 public void receive(byte[] buf) { 138 WanPipeHeader hdr=null; 139 Message msg=null; 140 141 try { 142 msg=(Message)Util.objectFromByteBuffer(buf); 143 } 144 catch(Exception e) { 145 System.err.println("WANPIPE.receive(): " + e); 146 return; 147 } 148 149 if(log.isInfoEnabled()) log.info("received msg " + msg); 150 hdr=(WanPipeHeader)msg.removeHeader(getName()); 151 152 153 String ch_name=null; 154 155 if(hdr.group_addr != null) 156 ch_name=hdr.group_addr; 157 158 if(group_addr == null) { 159 System.err.println("WANPIPE.receive(): group address in header was null, discarded"); 160 return; 161 } 162 163 if(ch_name != null && !group_addr.equals(ch_name)) 164 return; 165 166 passUp(new Event(Event.MSG, msg)); 167 } 168 169 170 171 172 173 public void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port) { 174 Object p=getPeer(); 175 176 passUp(new Event(Event.SUSPECT, p)); 177 } 178 179 180 public void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port) { 181 182 } 183 184 185 public void missedHeartbeat(InetAddress local, int local_port, InetAddress remote, int remote_port, int num_hbs) { 186 187 } 188 189 public void receivedHeartbeatAgain(InetAddress local, int local_port, InetAddress remote, int remote_port) { 190 191 } 192 193 194 195 196 public boolean setProperties(Properties props) {super.setProperties(props); 197 String str; 198 199 str=props.getProperty("name"); 200 if(str != null) { 201 name=str; 202 props.remove("name"); 203 } 204 205 str=props.getProperty("links"); 206 if(str != null) { 207 208 if(parseLinks(str) == false) 210 return false; 211 props.remove("links"); 212 } 213 214 if(name == null || name.length() == 0) { 215 System.err.println("WANPIPE.setProperties(): 'name' must be set"); 216 return false; 217 } 218 if(links.size() == 0) { 219 System.err.println("WANPIPE.setProperties(): no links specified (at least 1 link must be present)"); 220 return false; 221 } 222 223 if(props.size() > 0) { 224 System.err.println("WANPIPE.setProperties(): the following properties are not recognized:"); 225 props.list(System.out); 226 return false; 227 } 228 return true; 229 } 230 231 232 233 235 boolean parseLinks(String s) { 236 LinkInfo info; 237 StringTokenizer tok; 238 String src, dst; 239 int index=0; 241 s=s.replace('[', ' '); 242 s=s.replace(']', ' '); 243 s=s.trim(); 244 tok=new StringTokenizer (s, ","); 245 while(tok.hasMoreElements()) { 246 src=tok.nextToken().trim(); 247 dst=tok.nextToken().trim(); 248 info=new LinkInfo(); 249 250 index=src.indexOf('@'); 251 if(index == -1) { 252 System.err.println("WANPIPE.parseLinks(): local address " + src + " must have a @ separator"); 253 return false; 254 } 255 info.local_addr=src.substring(0, index); 256 info.local_port=Integer.parseInt(src.substring(index + 1, src.length())); 257 258 index=dst.indexOf('@'); 259 if(index == -1) { 260 System.err.println("WANPIPE.parseLinks(): remote address " + dst + " must have a @ separator"); 261 return false; 262 } 263 info.remote_addr=dst.substring(0, index); 264 info.remote_port=Integer.parseInt(dst.substring(index + 1, dst.length())); 265 266 links.add(info); 267 } 268 269 return true; 270 } 271 272 273 Object getPeer() { 274 Object ret=null; 275 if(members == null || members.size() == 0 || local_addr == null) 276 return null; 277 for(int i=0; i < members.size(); i++) 278 if(!members.elementAt(i).equals(local_addr)) 279 return members.elementAt(i); 280 return ret; 281 } 282 283 284 285 286 287 293 private void setSourceAddress(Message msg) { 294 if(msg.getSrc() == null) 295 msg.setSrc(local_addr); 296 } 297 298 299 300 301 302 private void sendUnicastMessage(Message msg) { 303 byte[] buf=null; 304 305 setSourceAddress(msg); 306 try { 307 buf=Util.objectToByteBuffer(msg); 308 } 309 catch(Exception e) { 310 System.err.println("WANPIPE.sendUnicastMessage(): " + e); 311 return; 312 } 313 314 try { 315 pipe.send(buf); 316 } 317 catch(LogicalLink.AllLinksDown links_down) { 318 System.err.println("WANPIPE.sendUnicastMessage(): WAN pipe has no currently operational " + 319 "link to send message. Discarding it."); 320 } 321 catch(LogicalLink.NoLinksAvailable no_links) { 322 System.err.println("WANPIPE.sendUnicastMessage(): WAN pipe has no physical links configured;" + 323 " cannot send message"); 324 } 325 catch(Exception e) { 326 System.err.println("WANPIPE.sendUnicastMessage(): " + e); 327 } 328 } 329 330 331 332 333 334 private void handleUpEvent(Event evt) { 335 switch(evt.getType()) { 336 337 case Event.SUSPECT: 338 break; 339 } 340 } 341 342 343 344 private void handleDownEvent(Event evt) { 345 switch(evt.getType()) { 346 347 case Event.TMP_VIEW: 348 case Event.VIEW_CHANGE: 349 synchronized(members) { 350 members.removeAllElements(); 351 Vector tmpvec=((View)evt.getArg()).getMembers(); 352 for(int i=0; i < tmpvec.size(); i++) 353 members.addElement(tmpvec.elementAt(i)); 354 } 355 break; 356 357 case Event.SUSPECT: 358 break; 359 360 case Event.GET_LOCAL_ADDRESS: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 362 break; 363 364 case Event.CONNECT: 365 group_addr=(String )evt.getArg(); 366 passUp(new Event(Event.CONNECT_OK)); 367 break; 368 369 case Event.DISCONNECT: 370 passUp(new Event(Event.DISCONNECT_OK)); 371 break; 372 373 } 374 } 375 376 377 378 379 private static class LinkInfo { 380 String local_addr=null, remote_addr=null; 381 int local_port=0, remote_port=0; 382 383 public String toString() { 384 StringBuffer ret=new StringBuffer (); 385 386 ret.append("local_addr=" + (local_addr != null? local_addr : "null")); 387 ret.append(":" + local_port); 388 ret.append(", remote_addr=" + (remote_addr != null ? remote_addr : "null")); 389 ret.append(":" + remote_port); 390 return ret.toString(); 391 } 392 } 393 394 395 public class WanPipeHeader extends Header { 396 public String group_addr=null; 397 398 399 public WanPipeHeader() {} 401 public WanPipeHeader(String n) {group_addr=n;} 402 403 404 public long size() { 405 return Header.HDR_OVERHEAD; 406 } 407 408 public String toString() { 409 return "[WanPipe: group_addr=" + group_addr + ']'; 410 } 411 412 public void writeExternal(ObjectOutput out) throws IOException { 413 out.writeObject(group_addr); 414 } 415 416 417 418 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 419 group_addr=(String )in.readObject(); 420 } 421 422 } 423 424 425 } 426 427 428 429 | Popular Tags |