1 3 package org.jgroups.stack; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Address; 9 import org.jgroups.Message; 10 import org.jgroups.protocols.TunnelHeader; 11 import org.jgroups.util.List; 12 import org.jgroups.util.Util; 13 import org.jgroups.util.ExposedByteArrayOutputStream; 14 import org.jgroups.util.Buffer; 15 16 import java.io.DataInputStream ; 17 import java.io.DataOutputStream ; 18 import java.io.ByteArrayInputStream ; 19 import java.net.Socket ; 20 21 22 23 24 public class RouterStub { 25 String router_host=null; int router_port=0; Socket sock=null; final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(512); 29 DataOutputStream output=null; DataInputStream input=null; Address local_addr=null; static final long RECONNECT_TIMEOUT=5000; private volatile boolean connected=false; 34 35 private volatile boolean reconnect=false; 37 protected static final Log log=LogFactory.getLog(RouterStub.class); 38 39 40 45 public RouterStub(String router_host, int router_port) { 46 this.router_host=router_host != null? router_host : "localhost"; 47 this.router_port=router_port; 48 } 49 50 51 public boolean isConnected() { 52 return connected; 53 } 54 55 62 public synchronized Address connect() throws Exception { 63 Address ret=null; 64 int len=0; 65 byte[] buf; 66 67 try { 68 sock=new Socket (router_host, router_port); 69 sock.setSoLinger(true, 500); 70 71 input=new DataInputStream (sock.getInputStream()); 73 len=input.readInt(); 74 buf=new byte[len]; 75 input.readFully(buf); 76 ret=(Address)Util.objectFromByteBuffer(buf); 77 output=new DataOutputStream (sock.getOutputStream()); 78 connected=true; 79 } 80 catch(Exception e) { 81 connected=false; 82 if(sock != null) 83 sock.close(); 84 throw e; 85 } 86 87 if(ret == null && sock != null) 89 ret=new org.jgroups.stack.IpAddress(sock.getLocalPort()); 90 91 if(local_addr == null) 93 local_addr=ret; 94 95 return ret; 96 } 97 98 99 100 public synchronized void disconnect() { 101 if(output != null) { 102 try { 103 output.close(); 104 output=null; 105 } 106 catch(Exception e) { 107 } 108 } 109 110 if(input != null) { 111 try { 112 input.close(); 113 input=null; 114 } 115 catch(Exception e) { 116 } 117 } 118 119 if(sock != null) { 120 try { 121 sock.close(); 122 sock=null; 123 } 124 catch(Exception e) { 125 } 126 } 127 connected=false; 128 reconnect=false; 130 } 131 132 133 138 public boolean register(String groupname) { 139 byte[] buf=null; 140 141 if(sock == null || output == null || input == null) { 142 if(log.isErrorEnabled()) log.error("no connection to router (groupname=" + groupname + ')'); 143 connected=false; 144 return false; 145 } 146 147 if(groupname == null || groupname.length() == 0) { 148 if(log.isErrorEnabled()) log.error("groupname is null"); 149 return false; 150 } 151 152 if(local_addr == null) { 153 if(log.isErrorEnabled()) log.error("local_addr is null"); 154 return false; 155 } 156 157 try { 158 buf=Util.objectToByteBuffer(local_addr); 159 output.writeInt(Router.REGISTER); 160 output.writeUTF(groupname); 161 output.writeInt(buf.length); 162 output.write(buf, 0, buf.length); output.flush(); 164 } 165 catch(Exception e) { 166 if(log.isErrorEnabled()) log.error("failure: " + Util.getStackTrace(e)); 167 connected=false; 168 return false; 169 } 170 return true; 171 } 172 173 174 179 public List get(String groupname) { 180 List ret=null; 181 Socket tmpsock=null; 182 DataOutputStream tmpOutput=null; 183 DataInputStream tmpInput=null; 184 int len; 185 byte[] buf; 186 187 188 if(groupname == null || groupname.length() == 0) { 189 if(log.isErrorEnabled()) log.error("groupname is null"); 190 return null; 191 } 192 193 try { 194 tmpsock=new Socket (router_host, router_port); 195 tmpsock.setSoLinger(true, 500); 196 tmpInput=new DataInputStream (tmpsock.getInputStream()); 197 198 len=tmpInput.readInt(); buf=new byte[len]; tmpInput.readFully(buf); 201 tmpOutput=new DataOutputStream (tmpsock.getOutputStream()); 202 203 tmpOutput.writeInt(Router.GET); 205 tmpOutput.writeUTF(groupname); 206 207 len=tmpInput.readInt(); 209 if(len == 0) 210 return null; 211 212 buf=new byte[len]; 213 tmpInput.readFully(buf); 214 ret=(List)Util.objectFromByteBuffer(buf); 215 } 216 catch(Exception e) { 217 if(log.isErrorEnabled()) log.error("exception=" + e); 218 } 219 finally { 220 try { 221 if(tmpOutput != null) tmpOutput.close(); 222 } 223 catch(Exception e) { 224 } 225 try { 226 if(tmpInput != null) tmpInput.close(); 227 } 228 catch(Exception e) { 229 } 230 try { 231 if(tmpsock != null) tmpsock.close(); 232 } 233 catch(Exception e) { 234 } 235 } 236 return ret; 237 } 238 239 240 242 public boolean send(Message msg, String groupname) { 243 Address dst_addr=null; 244 245 if(sock == null || output == null || input == null) { 246 if(log.isErrorEnabled()) log.error("no connection to router (groupname=" + groupname + ')'); 247 connected=false; 248 return false; 249 } 250 251 if(msg == null) { 252 if(log.isErrorEnabled()) log.error("message is null"); 253 return false; 254 } 255 256 try { 257 dst_addr=msg.getDest(); out_stream.reset(); 259 DataOutputStream tmp=new DataOutputStream (out_stream); 260 msg.writeTo(tmp); 261 tmp.close(); 262 Buffer buf=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); 263 264 output.writeUTF(groupname); 266 267 Util.writeAddress(dst_addr, output); 269 270 output.writeInt(buf.getLength()); 272 273 output.write(buf.getBuf(), 0, buf.getLength()); 275 } 276 catch(Exception e) { 277 if(log.isErrorEnabled()) log.error("failed sending message to " + dst_addr, e); 278 connected=false; 279 return false; 280 } 281 return true; 282 } 283 284 285 287 public Message receive() { 288 Message ret=null; 289 byte[] buf=null; 290 int len; 291 292 if(sock == null || output == null || input == null) { 293 if(log.isErrorEnabled()) log.error("no connection to router"); 294 connected=false; 295 return null; 296 } 297 try { 298 len=input.readInt(); 299 if(len == 0) { 300 ret=null; 301 } 302 else { 303 buf=new byte[len]; 304 input.readFully(buf, 0, len); 305 ret=new Message(); 306 ByteArrayInputStream tmp=new ByteArrayInputStream (buf); 307 DataInputStream in=new DataInputStream (tmp); 308 ret.readFrom(in); 309 in.close(); 310 } 311 } 312 catch(Exception e) { 313 if (connected) { 314 if(log.isTraceEnabled()) log.trace("failed receiving message", e); 315 } 316 connected=false; 317 return null; 318 } 319 320 if(log.isTraceEnabled()) log.trace("received "+ret); 321 return ret; 322 } 323 324 325 326 public boolean reconnect(int max_attempts) { 327 Address new_addr=null; 328 int num_atttempts=0; 329 330 if(connected) return false; 331 disconnect(); 332 reconnect=true; 333 while(reconnect && (num_atttempts++ < max_attempts || max_attempts == -1)) { 334 try { 335 if((new_addr=connect()) != null) 336 break; 337 } 338 catch(Exception ex) { 339 if(log.isWarnEnabled()) log.warn("exception is " + ex); 340 } 341 if(max_attempts == -1) 342 Util.sleep(RECONNECT_TIMEOUT); 343 } 344 if(new_addr == null) { 345 return false; 346 } 347 if(log.isWarnEnabled()) log.warn("client reconnected, new address is " + new_addr); 348 return true; 349 } 350 351 352 public boolean reconnect() { 353 return reconnect(-1); 354 } 355 356 public static void main(String [] args) { 357 if(args.length != 2) { 358 System.out.println("RouterStub <host> <port>"); 359 return; 360 } 361 RouterStub stub=new RouterStub(args[0], Integer.parseInt(args[1])); 362 Address my_addr; 363 boolean rc; 364 final String groupname="BelaGroup"; 365 Message msg; 366 List mbrs; 367 368 try { 369 my_addr=stub.connect(); 370 System.out.println("My address is " + my_addr); 371 372 System.out.println("Registering under " + groupname); 373 rc=stub.register(groupname); 374 System.out.println("Done, rc=" + rc); 375 376 377 System.out.println("Getting members of " + groupname + ": "); 378 mbrs=stub.get(groupname); 379 System.out.println("Done, mbrs are " + mbrs); 380 381 382 for(int i=0; i < 10; i++) { 383 msg=new Message(null, my_addr, "Bela #" + i); 384 msg.putHeader("TUNNEL", new TunnelHeader(groupname)); 385 rc=stub.send(msg, groupname); 386 System.out.println("Sent msg, rc=" + rc); 387 } 388 389 for(int i=0; i < 10; i++) { 390 System.out.println("stub.receive():"); 391 msg=stub.receive(); 392 System.out.println("Received msg"); 393 } 394 395 } 396 catch(Exception ex) { 397 System.err.println(ex); 398 } 399 finally { 400 stub.disconnect(); 401 } 402 } 403 404 405 } 406 | Popular Tags |