1 19 package org.objectweb.carol.cmi; 20 21 import java.io.ByteArrayInputStream ; 22 import java.io.ByteArrayOutputStream ; 23 import java.io.IOException ; 24 import java.io.Serializable ; 25 import java.net.DatagramPacket ; 26 import java.net.InetAddress ; 27 import java.net.MulticastSocket ; 28 import java.net.SocketException ; 29 import java.rmi.Remote ; 30 import java.rmi.RemoteException ; 31 import java.util.Arrays ; 32 import java.util.HashMap ; 33 import java.util.HashSet ; 34 import java.util.Iterator ; 35 import java.util.LinkedList ; 36 import java.util.Map ; 37 import java.util.Set ; 38 import java.util.Vector ; 39 40 import org.jgroups.Address; 41 import org.jgroups.Channel; 42 import org.jgroups.JChannel; 43 import org.jgroups.Message; 44 import org.jgroups.SuspectEvent; 45 import org.jgroups.View; 46 import org.objectweb.carol.util.configuration.TraceCarol; 47 48 class ExportMsg implements Serializable { 49 public transient ClusterId id; 50 public transient Serializable key; 51 public transient byte[] stub; 52 public transient int factor; 53 54 public ExportMsg( 55 ClusterId serverId, 56 Serializable key, 57 byte[] stub, 58 int factor) { 59 this.id = serverId; 60 this.key = key; 61 this.stub = stub; 62 this.factor = factor; 63 } 64 65 private void writeObject(java.io.ObjectOutputStream out) 66 throws IOException { 67 id.write(out); 68 out.writeObject(key); 69 out.writeObject(stub); 70 out.writeInt(factor); 71 } 72 73 private void readObject(java.io.ObjectInputStream in) 74 throws IOException , ClassNotFoundException { 75 id = ClusterId.read(in); 76 key = (Serializable ) in.readObject(); 77 stub = (byte[]) in.readObject(); 78 factor = in.readInt(); 79 } 80 } 81 82 class RequestExportsMsg implements Serializable { 83 } 84 85 class UnexportMsg implements Serializable { 86 public transient ClusterId i; 87 public transient Serializable k; 88 89 public UnexportMsg(ClusterId serverId, Serializable key) { 90 i = serverId; 91 k = key; 92 } 93 94 private void writeObject(java.io.ObjectOutputStream out) 95 throws IOException { 96 i.write(out); 97 out.writeObject(k); 98 } 99 100 private void readObject(java.io.ObjectInputStream in) 101 throws IOException , ClassNotFoundException { 102 i = ClusterId.read(in); 103 k = (Serializable ) in.readObject(); 104 } 105 } 106 107 111 143 class GlobalExports { 144 147 private HashMap table = new HashMap (); 148 149 public GlobalExports() { 150 } 151 152 public synchronized void put( 153 ClusterId serverId, 154 Serializable key, 155 byte[] stub, 156 int factor) 157 throws RemoteException { 158 ClusterStubData csd = (ClusterStubData) table.get(key); 159 if (csd == null) { 160 csd = new ClusterStubData(serverId, stub, factor); 161 table.put(key, csd); 162 } else if (!csd.setStub(serverId, stub, factor)) 163 if (TraceCarol.isDebugCmiDes()) 164 TraceCarol.debugCmiDes( 165 "Warning: Object registered in the cluster as two distinct types"); 166 } 167 168 public synchronized void remove(ClusterId serverId, Serializable key) { 169 ClusterStubData csd = (ClusterStubData) table.get(key); 170 if (csd == null) 171 return; 172 if (csd.removeStub(serverId)) 173 return; 174 table.remove(key); 175 } 176 177 189 public synchronized void zapExports(ClusterId serverId) { 190 Iterator i = table.values().iterator(); 191 while (i.hasNext()) { 192 ClusterStubData csd = (ClusterStubData) i.next(); 193 if (!csd.removeStub(serverId)) 194 i.remove(); 195 } 196 } 197 198 public ClusterStubData getClusterStubData(Serializable key) throws RemoteException { 199 synchronized (this) { 200 return (ClusterStubData) table.get(key); 201 } 202 } 203 204 public synchronized Set keySet() { 205 HashSet s = new HashSet (); 206 Iterator it = table.keySet().iterator(); 207 while (it.hasNext()) { 208 s.add(it.next()); 209 } 210 return s; 211 } 212 } 213 214 class LocalExports { 215 private HashMap map = new HashMap (); 216 private byte[] buf = null; 218 219 public synchronized void put(Serializable key, byte[] obj) { 220 if ((key == null) || (obj == null)) 221 throw new NullPointerException (); 222 map.put(key, obj); 223 buf = null; 224 } 225 226 public synchronized Object get(Serializable key) { 227 if (key == null) 228 throw new NullPointerException (); 229 return map.get(key); 230 } 231 232 public synchronized void remove(Serializable key) { 233 if (map.remove(key) != null) 234 buf = null; 235 } 236 237 public HashMap getmap() { 238 return map; 239 } 240 241 } 258 259 class BindAddressChooser extends Thread { 260 private MulticastSocket sock; 261 private InetAddress group; 262 private int port; 263 static final int TIMEOUT = 10; 264 static final int RETRIES = 20; 265 266 BindAddressChooser(MulticastSocket sock, InetAddress group, int port) { 267 this.sock = sock; 268 this.group = group; 269 this.port = port; 270 } 271 272 public void run() { 273 for (int i = 0; i < RETRIES; i++) { 274 byte[] msg = { 0 }; 275 DatagramPacket pkt = 276 new DatagramPacket (msg, msg.length, group, port); 277 try { 278 sock.send(pkt); 279 } catch (IOException e) { 280 } 282 try { 283 Thread.sleep(TIMEOUT); 284 } catch (InterruptedException e1) { 285 return; 287 } 288 } 289 } 290 } 291 292 296 class DistributedEquivSystem { 297 private String chan_props; 298 private String groupname; 299 private Channel chan; 300 private MessageDequeuer mdq; 301 private View view; 302 private Address my_addr; 303 private ClusterId my_id; 304 private LocalExports localExports = new LocalExports(); 305 private GlobalExports globalExports = new GlobalExports(); 306 private HashMap idmap = new HashMap (); 307 308 313 private class MessageDequeuer extends Thread { 314 public void run() { 315 if (TraceCarol.isDebugCmiDes()) 316 TraceCarol.debugCmiDes("Message dequeuer started"); 317 Object msg; 318 try { 319 do { 320 if (isInterrupted()) { 322 break; 323 } 324 msg = chan.receive(0); 325 if (msg == null) 326 continue; 327 else if (msg instanceof Message) 328 receive((Message) msg); 329 else if (msg instanceof View) 330 viewAccepted((View) msg); 331 else if (msg instanceof SuspectEvent); 332 else if (TraceCarol.isDebugCmiDes()) 333 TraceCarol.debugCmiDes( 334 "Received but not supported : " + msg.getClass()); 335 } while (true); 336 } catch (Exception e) { 337 e.printStackTrace(); 338 } 339 if (TraceCarol.isDebugCmiDes()) 340 TraceCarol.debugCmiDes("Message dequeuer finished."); 341 } 342 } 343 344 private static String chooseBindAddress2( 345 String groupname_or_ip, 346 int port) { 347 int ip_ttl = 0; 348 MulticastSocket sock; 349 Thread sender; 350 try { 351 InetAddress group = InetAddress.getByName(groupname_or_ip); 352 sock = new MulticastSocket (port); 353 sender = new BindAddressChooser(sock, group, port); 354 sock.setTimeToLive(ip_ttl); 355 sock.joinGroup(group); 356 } catch (IOException e2) { 357 return null; 358 } 359 360 sender.start(); 361 byte[] buf = new byte[2]; 362 DatagramPacket recv = new DatagramPacket (buf, buf.length); 363 long end = 364 System.currentTimeMillis() 365 + BindAddressChooser.RETRIES * BindAddressChooser.TIMEOUT; 366 do { 367 recv.setData(buf, 0, buf.length); 368 try { 369 sock.receive(recv); 370 } catch (IOException e1) { 371 sender.interrupt(); 373 return null; 374 } 375 byte[] msg = recv.getData(); 376 if ((recv.getLength() != 1) || (recv.getData()[0] != 0)) { 377 continue; 378 } 379 InetAddress a = recv.getAddress(); 380 try { 381 sock.setInterface(a); 382 } catch (SocketException e) { 383 continue; 384 } 385 sender.interrupt(); 386 return a.getHostAddress(); 387 } while (System.currentTimeMillis() < end); 388 sender.interrupt(); 389 return null; 390 } 391 392 private static String chooseBindAddress() { 393 String s = Config.getMulticastItf(); 394 if (s == null) { 395 return null; 396 } 397 LinkedList l; 398 try { 399 InetMask m = new InetMask(s); 400 l = m.filterLocal(); 401 } catch (Exception e) { 402 return null; 403 } 404 if (l.size() != 1) { 405 return null; 406 } 407 return ((InetAddress ) l.getFirst()).getHostAddress(); 408 } 409 410 DistributedEquivSystem() 411 throws 412 ConfigException, 413 ClusterException, 414 org.jgroups.ChannelException, 415 org.jgroups.ChannelClosedException { 416 ClusterIdFactory.start(); 417 String mcast_addr = Config.getMulticastAddress(); 418 int mcast_port = Config.getMulticastPort(); 419 String bind_addr = chooseBindAddress(); 420 if (bind_addr != null) { 421 bind_addr = ";bind_addr=" + bind_addr; 422 } else { 423 bind_addr = ""; 424 } 425 426 chan_props = "UDP(mcast_addr=" 429 + mcast_addr 430 + ";mcast_port=" 431 + mcast_port 432 + bind_addr 433 + ";ip_ttl=32;" 434 + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" 435 + "PING(timeout=2000;num_initial_members=3):" 436 + "MERGE2(min_interval=5000;max_interval=10000):" 437 + "FD(timeout=2000;max_tries=3;shun=true):" 438 + "VERIFY_SUSPECT(timeout=1500):" 439 + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):" 440 + "UNICAST(timeout=1200,2400,3600):" 441 + "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=2000):" 442 + "FRAG(frag_size=4096;down_thread=false;up_thread=false):" 443 + "pbcast.GMS(join_timeout=3000;join_retry_timeout=2000;" 444 + "shun=false;print_local_addr=true)"; 445 446 groupname = Config.getMulticastGroupName(); 448 chan = new JChannel(chan_props); 449 chan.connect(groupname); 450 my_addr = chan.getLocalAddress(); 451 my_id = ClusterIdFactory.getLocalId(); 452 idmap.put(my_addr, my_id); 453 454 Vector v = new Vector (); 455 v.add(my_addr); 456 view = new View(my_addr, 0, v); 457 458 mdq = new MessageDequeuer(); 459 mdq.setContextClassLoader( 460 Thread.currentThread().getContextClassLoader()); 461 mdq.start(); 462 if (TraceCarol.isDebugCmiDes()) 463 TraceCarol.debugCmiDes("sending RequestExportsMsg"); 464 broadcast(new RequestExportsMsg()); 465 if (TraceCarol.isDebugCmiDes()) 466 TraceCarol.debugCmiDes( 467 "DistributedEquivSystem started on " 468 + Config.getMulticastAddress() 469 + ":" 470 + Config.getMulticastPort() 471 + "/" 472 + Config.getMulticastGroupName() 473 + ", cluster Id " 474 + my_id); 475 } 476 477 private void broadcast(Serializable msg) { 478 ByteArrayOutputStream outs = new ByteArrayOutputStream (); 479 try { 480 CmiOutputStream out = new CmiOutputStream(outs); 481 out.writeObject(msg); 482 out.flush(); 483 Message m = new Message(null, my_addr, outs.toByteArray()); 484 chan.send(m); 485 if (TraceCarol.isDebugCmiDes()) { 486 TraceCarol.debugCmiDes("broadcast sent"); 487 } 488 } catch (Exception e) { 489 if (TraceCarol.isDebugCmiDes()) { 490 TraceCarol.debugCmiDes("when broadcasting " + e.toString()); 491 } 492 } 493 } 494 495 private void viewAccepted(View v2) { 496 if (TraceCarol.isDebugCmiDes()) 497 TraceCarol.debugCmiDes("New view accepted : " + v2); 498 LinkedList newMembers = new LinkedList (); 499 LinkedList oldMembers = new LinkedList (); 500 501 Object ar1[] = ((Vector ) (view.getMembers().clone())).toArray(); 502 Arrays.sort(ar1); 503 Iterator i1 = Arrays.asList(ar1).iterator(); 504 Object ar2[] = ((Vector ) (v2.getMembers().clone())).toArray(); 505 Arrays.sort(ar2); 506 Iterator i2 = Arrays.asList(ar2).iterator(); 507 508 view = v2; 509 Address a1 = (i1.hasNext()) ? (Address ) i1.next() : null; 510 Address a2 = (i2.hasNext()) ? (Address ) i2.next() : null; 511 while (true) { 512 int d; 513 if (a1 != null) { 514 if (a2 != null) 515 d = a1.compareTo(a2); 516 else 517 d = -1; 518 } else { 519 if (a2 == null) 520 break; 521 d = 1; 522 } 523 524 if (d > 0) { 525 newMembers.addLast(a2); 527 a2 = (i2.hasNext()) ? (Address ) i2.next() : null; 528 } else if (d < 0) { 529 oldMembers.addLast(a1); 531 a1 = (i1.hasNext()) ? (Address ) i1.next() : null; 532 } else { 533 a1 = (i1.hasNext()) ? (Address ) i1.next() : null; 534 a2 = (i2.hasNext()) ? (Address ) i2.next() : null; 535 } 536 } 537 538 while (oldMembers.size() > 0) { 539 Address a = (Address ) oldMembers.removeFirst(); 540 ClusterId id = (ClusterId) idmap.get(a); 541 if (id != null) 542 globalExports.zapExports(id); 543 idmap.remove(a); 544 if (TraceCarol.isDebugCmiDes()) { 545 if (id == null) 546 TraceCarol.debugCmiDes("Member " + a + " removed"); 547 else 548 TraceCarol.debugCmiDes( 549 "Member " + a + " removed (server id : " + id + ")"); 550 } 551 } 552 553 if (newMembers.size() > 0) { 554 564 } 565 566 if (TraceCarol.isDebugCmiDes()) { 567 while (newMembers.size() > 0) { 568 Address a = (Address ) newMembers.removeFirst(); 569 TraceCarol.debugCmiDes("New member " + a); 570 } 571 } 572 } 573 574 private ClusterId checkServer(ClusterId id, Address ad) { 575 ClusterId i = (ClusterId) idmap.get(ad); 577 if (i == null) { 578 i = id; 579 idmap.put(ad, id); 580 return id; 581 } else if (i.equals(id)) { 582 return id; 583 } 584 if (TraceCarol.isDebugCmiDes()) 585 TraceCarol.debugCmiDes("Message ignored (server rejected)"); 586 return null; 587 } 588 589 private boolean self(ClusterId id) { 590 return my_id.equals(id); 591 } 592 593 private void receive(Message m) { 594 Object o; 595 byte[] buf = m.getBuffer(); 596 if (buf == null) { 597 if (TraceCarol.isDebugCmiDes()) 598 TraceCarol.debugCmiDes("buf == null"); 599 o = null; 600 } else { 601 try { 602 ByteArrayInputStream in_stream = new ByteArrayInputStream (buf); 603 CmiInputStream in = new CmiInputStream(in_stream); 604 o = in.readObject(); 605 } catch (Exception e) { 606 if (TraceCarol.isDebugCmiDes()) 607 TraceCarol.debugCmiDes(e.toString()); 608 o = null; 609 } 610 } 611 612 Address from = m.getSrc(); 613 if (o instanceof ExportMsg) { 628 ExportMsg pm = (ExportMsg) o; 629 ClusterId id = checkServer(pm.id, from); 630 if (id == null) 631 return; 632 if (TraceCarol.isDebugCmiDes()) 633 TraceCarol.debugCmiDes( 634 "Put message received from server " 635 + from 636 + ", ID : " 637 + pm.key); 638 if (!self(id)) { 639 try { 640 byte[] stub = pm.stub; 641 if (stub != null) 642 globalExports.put(id, pm.key, stub, pm.factor); 643 } catch (RemoteException e) { 644 } 645 } 646 } else if (o instanceof UnexportMsg) { 647 UnexportMsg rm = (UnexportMsg) o; 648 ClusterId id = checkServer(rm.i, from); 649 if (id == null) 650 return; 651 if (TraceCarol.isDebugCmiDes()) 652 TraceCarol.debugCmiDes( 653 "Remove message received from server " 654 + from 655 + ", ID : " 656 + rm.k); 657 if (!self(id)) 658 globalExports.remove(id, rm.k); 659 } else if (o instanceof RequestExportsMsg) { 660 if (TraceCarol.isDebugCmiDes()) 661 TraceCarol.debugCmiDes("sending local exports"); 662 663 synchronized (localExports) { 664 HashMap h = localExports.getmap(); 665 Iterator i = h.entrySet().iterator(); 666 while (i.hasNext()) { 667 Map.Entry e = (Map.Entry ) i.next(); 668 broadcast( 669 new ExportMsg( 670 my_id, 671 (String ) e.getKey(), 672 (byte[]) e.getValue(), 673 Config.getLoadFactor())); 674 } 675 } 676 677 } else if (TraceCarol.isDebugCmiDes()) { 679 TraceCarol.debugCmiDes( 680 "Message of unknown type received from server " + from); 681 } 682 } 683 684 void terminate() { 685 mdq.interrupt(); 686 } 687 688 691 boolean exportObject(Serializable key, byte[] obj) throws RemoteException { 692 if (TraceCarol.isDebugCmiDes()) 693 TraceCarol.debugCmiDes( 694 "exportObject(" + key + ", " + obj.getClass().getName() + ")"); 695 int factor; 696 synchronized (localExports) { 697 Object cur = localExports.get(key); 698 if (cur != null) 699 return false; 700 localExports.put(key, obj); 701 factor = Config.getLoadFactor(); 702 globalExports.put(my_id, key, obj, factor); 703 broadcast(new ExportMsg(my_id, key, obj, factor)); 704 } 705 return true; 706 } 707 708 boolean unexportObject(Serializable key) { 709 if (TraceCarol.isDebugCmiDes()) 710 TraceCarol.debugCmiDes("unexportObject(" + key + ")"); 711 synchronized (localExports) { 712 Object cur = localExports.get(key); 713 if (cur == null) 714 return false; 715 localExports.remove(key); 716 globalExports.remove(my_id, key); 717 broadcast(new UnexportMsg(my_id, key)); 718 } 719 return true; 720 } 721 722 726 ClusterStubData getGlobal(Serializable key) throws RemoteException { 727 if (TraceCarol.isDebugCmiDes()) 728 TraceCarol.debugCmiDes("getGlobal(" + key + ")"); 729 730 ClusterStub cs; 731 return globalExports.getClusterStubData(key); 732 } 733 734 738 Remote getLocal(Serializable key) { 739 if (TraceCarol.isDebugCmiDes()) 740 TraceCarol.debugCmiDes("getLocal(" + key + ")"); 741 return (Remote ) localExports.get(key); 742 } 743 744 Set keySet() { 745 return globalExports.keySet(); 746 } 747 } 748 | Popular Tags |