1 3 4 package org.jgroups.protocols; 5 6 7 import org.jgroups.Address; 8 import org.jgroups.Event; 9 import org.jgroups.Message; 10 import org.jgroups.View; 11 import org.jgroups.util.TimeScheduler; 12 import org.jgroups.util.Util; 13 import org.jgroups.stack.Protocol; 14 import org.jgroups.stack.RouterStub; 15 import org.jgroups.stack.IpAddress; 16 17 import java.util.Enumeration ; 18 import java.util.Properties ; 19 import java.util.Vector ; 20 import java.util.HashMap ; 21 22 23 24 25 38 public class TUNNEL extends Protocol implements Runnable { 39 final Properties properties=null; 40 String channel_name=null; 41 final Vector members=new Vector (); 42 String router_host=null; 43 int router_port=0; 44 Address local_addr=null; Thread receiver=null; 46 RouterStub stub=null; 47 private final Object stub_mutex=new Object (); 48 49 53 boolean loopback=true; 54 55 TimeScheduler timer=null; 56 57 Reconnector reconnector=null; 58 private final Object reconnector_mutex=new Object (); 59 60 62 byte[] additional_data=null; 63 64 65 public TUNNEL() { 66 } 67 68 69 public String toString() { 70 return "Protocol TUNNEL(local_addr=" + local_addr + ')'; 71 } 72 73 74 75 76 77 78 public String getName() { 79 return "TUNNEL"; 80 } 81 82 public void init() throws Exception { 83 super.init(); 84 timer=stack.timer; 85 } 86 87 public void start() throws Exception { 88 createTunnel(); passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 90 } 91 92 public void stop() { 93 if(receiver != null) 94 receiver=null; 95 teardownTunnel(); 96 stopReconnector(); 97 } 98 99 100 101 105 public void startUpHandler() { 106 ; 107 } 108 109 110 public boolean setProperties(Properties props) { 111 String str; 112 113 super.setProperties(props); 114 str=props.getProperty("router_host"); 115 if(str != null) { 116 router_host=str; 117 props.remove("router_host"); 118 } 119 120 str=props.getProperty("router_port"); 121 if(str != null) { 122 router_port=Integer.parseInt(str); 123 props.remove("router_port"); 124 } 125 126 if(log.isDebugEnabled()) { 127 log.debug("router_host=" + router_host + ";router_port=" + router_port); 128 } 129 130 if(router_host == null || router_port == 0) { 131 if(log.isErrorEnabled()) { 132 log.error("both router_host and router_port have to be set !"); 133 return false; 134 } 135 } 136 137 str=props.getProperty("loopback"); 138 if(str != null) { 139 loopback=Boolean.valueOf(str).booleanValue(); 140 props.remove("loopback"); 141 } 142 143 if(props.size() > 0) { 144 StringBuffer sb=new StringBuffer (); 145 for(Enumeration e=props.propertyNames(); e.hasMoreElements();) { 146 sb.append(e.nextElement().toString()); 147 if(e.hasMoreElements()) { 148 sb.append(", "); 149 } 150 } 151 if(log.isErrorEnabled()) log.error("The following properties are not recognized: " + sb); 152 return false; 153 } 154 return true; 155 } 156 157 158 159 public void down(Event evt) { 160 Message msg; 161 TunnelHeader hdr; 162 Address dest; 163 164 if(log.isDebugEnabled()) { 165 log.debug(evt.toString()); 166 } 167 168 if(evt.getType() != Event.MSG) { 169 handleDownEvent(evt); 170 return; 171 } 172 173 hdr=new TunnelHeader(channel_name); 174 msg=(Message)evt.getArg(); 175 dest=msg.getDest(); 176 msg.putHeader(getName(), hdr); 177 178 if(msg.getSrc() == null) 179 msg.setSrc(local_addr); 180 181 if(loopback && (dest == null || dest.equals(local_addr) || dest.isMulticastAddress())) { 185 Message copy=msg.copy(); 186 copy.setSrc(local_addr); 188 copy.setDest(dest); 189 evt=new Event(Event.MSG, copy); 190 191 193 if(observer != null) 194 observer.up(evt, up_queue.size()); 195 if(log.isTraceEnabled()) log.trace("looped back local message " + copy); 196 passUp(evt); 197 if(dest != null && !dest.isMulticastAddress()) 198 return; 199 } 200 201 if(!stub.isConnected() || !stub.send(msg, channel_name)) { startReconnector(); 203 } 204 } 205 206 207 208 void createTunnel() throws Exception { 209 if(router_host == null || router_port == 0) 210 throw new Exception ("router_host and/or router_port not set correctly; tunnel cannot be created"); 211 212 synchronized(stub_mutex) { 213 stub=new RouterStub(router_host, router_port); 214 local_addr=stub.connect(); 215 if(additional_data != null && local_addr instanceof IpAddress) 216 ((IpAddress)local_addr).setAdditionalData(additional_data); 217 } 218 if(local_addr == null) 219 throw new Exception ("could not obtain local address !"); 220 } 221 222 223 224 void teardownTunnel() { 225 synchronized(stub_mutex) { 226 if(stub != null) { 227 stub.disconnect(); 228 stub=null; 229 } 230 } 231 } 232 233 234 235 236 237 238 239 240 public void run() { 241 Message msg; 242 243 if(stub == null) { 244 if(log.isErrorEnabled()) log.error("router stub is null; cannot receive messages from router !"); 245 return; 246 } 247 248 while(receiver != null) { 249 msg=stub.receive(); 250 if(msg == null) { 251 if(receiver == null) break; 252 if(log.isErrorEnabled()) log.error("received a null message. Trying to reconnect to router"); 253 if(!stub.isConnected()) 254 startReconnector(); 255 Util.sleep(5000); 256 continue; 257 } 258 handleIncomingMessage(msg); 259 } 260 } 261 262 263 264 265 266 267 268 269 270 271 public void handleIncomingMessage(Message msg) { 272 TunnelHeader hdr=(TunnelHeader)msg.removeHeader(getName()); 273 274 if(loopback) { 276 Address dst=msg.getDest(); 277 Address SRC=msg.getSrc(); 278 279 if(dst != null && dst.isMulticastAddress() && src != null && local_addr.equals(src)) { 280 if(log.isTraceEnabled()) 281 log.trace("discarded own loopback multicast packet"); 282 return; 283 } 284 } 285 286 if(log.isDebugEnabled()) { 287 log.debug("received message " + msg); 288 } 289 290 291 292 293 String ch_name=hdr != null? hdr.channel_name : null; 294 if(ch_name != null && !channel_name.equals(ch_name)) 295 return; 296 297 passUp(new Event(Event.MSG, msg)); 298 } 299 300 301 void handleDownEvent(Event evt) { 302 303 switch(evt.getType()) { 304 305 case Event.TMP_VIEW: 306 case Event.VIEW_CHANGE: 307 synchronized(members) { 308 members.removeAllElements(); 309 Vector tmpvec=((View)evt.getArg()).getMembers(); 310 for(int i=0; i < tmpvec.size(); i++) 311 members.addElement(tmpvec.elementAt(i)); 312 } 313 break; 314 315 case Event.GET_LOCAL_ADDRESS: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 317 break; 318 319 case Event.SET_LOCAL_ADDRESS: 320 local_addr=(Address)evt.getArg(); 321 if(local_addr instanceof IpAddress && additional_data != null) 322 ((IpAddress)local_addr).setAdditionalData(additional_data); 323 break; 324 325 case Event.CONNECT: 326 channel_name=(String )evt.getArg(); 327 if(stub == null) { 328 if(log.isErrorEnabled()) log.error("CONNECT: router stub is null!"); 329 } 330 else { 331 stub.register(channel_name); 332 } 333 334 receiver=new Thread (this, "TUNNEL receiver thread"); 335 receiver.setDaemon(true); 336 receiver.start(); 337 338 passUp(new Event(Event.CONNECT_OK)); 339 break; 340 341 case Event.DISCONNECT: 342 if(receiver != null) { 343 receiver=null; 344 if(stub != null) 345 stub.disconnect(); 346 } 347 teardownTunnel(); 348 passUp(new Event(Event.DISCONNECT_OK)); 349 passUp(new Event(Event.SET_LOCAL_ADDRESS, null)); 350 break; 351 352 case Event.CONFIG: 353 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 354 handleConfigEvent((HashMap )evt.getArg()); 355 break; 356 } 357 } 358 359 private void startReconnector() { 360 synchronized(reconnector_mutex) { 361 if(reconnector == null || reconnector.cancelled()) { 362 reconnector=new Reconnector(); 363 timer.add(reconnector); 364 } 365 } 366 } 367 368 private void stopReconnector() { 369 synchronized(reconnector_mutex) { 370 if(reconnector != null) { 371 reconnector.stop(); 372 reconnector=null; 373 } 374 } 375 } 376 377 void handleConfigEvent(HashMap map) { 378 if(map == null) return; 379 if(map.containsKey("additional_data")) 380 additional_data=(byte[])map.get("additional_data"); 381 } 382 383 384 385 386 private class Reconnector implements TimeScheduler.Task { 387 boolean cancelled=false; 388 389 390 public void stop() { 391 cancelled=true; 392 } 393 394 public boolean cancelled() { 395 return cancelled; 396 } 397 398 public long nextInterval() { 399 return 5000; 400 } 401 402 public void run() { 403 if(stub.reconnect()) { 404 stub.register(channel_name); 405 if(log.isDebugEnabled()) log.debug("reconnected"); 406 stop(); 407 } 408 } 409 } 410 411 412 } 413 | Popular Tags |