1 24 package fr.dyade.aaa.agent; 25 26 import java.io.*; 27 import java.util.Vector ; 28 29 import org.objectweb.util.monolog.api.BasicLevel; 30 import org.objectweb.util.monolog.api.Logger; 31 32 import fr.dyade.aaa.util.Arrays; 33 34 37 public abstract class Network implements MessageConsumer, NetworkMBean { 38 49 long WDActivationPeriod = 1000L; 50 51 56 public long getWDActivationPeriod() { 57 return WDActivationPeriod; 58 } 59 60 65 public void setWDActivationPeriod(long WDActivationPeriod) { 66 this.WDActivationPeriod = WDActivationPeriod; 67 } 68 69 79 int WDNbRetryLevel1 = 30; 80 81 86 public int getWDNbRetryLevel1() { 87 return WDNbRetryLevel1; 88 } 89 90 95 public void setWDNbRetryLevel1(int WDNbRetryLevel1) { 96 this.WDNbRetryLevel1 = WDNbRetryLevel1; 97 } 98 99 110 long WDRetryPeriod1 = WDActivationPeriod/2; 111 112 117 public long getWDRetryPeriod1() { 118 return WDRetryPeriod1; 119 } 120 121 126 public void setWDRetryPeriod1(long WDRetryPeriod1) { 127 this.WDRetryPeriod1 = WDRetryPeriod1; 128 } 129 130 140 int WDNbRetryLevel2 = 55; 141 142 147 public int getWDNbRetryLevel2() { 148 return WDNbRetryLevel2; 149 } 150 151 156 public void setWDNbRetryLevel2(int WDNbRetryLevel2) { 157 this.WDNbRetryLevel2 = WDNbRetryLevel2; 158 } 159 160 171 long WDRetryPeriod2 = 5000L; 172 173 178 public long getWDRetryPeriod2() { 179 return WDRetryPeriod2; 180 } 181 182 187 public void setWDRetryPeriod2(long WDRetryPeriod2) { 188 this.WDRetryPeriod2 = WDRetryPeriod2; 189 } 190 191 202 long WDRetryPeriod3 = 60000L; 203 204 209 public long getWDRetryPeriod3() { 210 return WDRetryPeriod3; 211 } 212 213 218 public void setWDRetryPeriod3(long WDRetryPeriod3) { 219 this.WDRetryPeriod3 = WDRetryPeriod3; 220 } 221 222 227 public int getNbWaitingMessages() { 228 return qout.size(); 229 } 230 231 protected Logger logmon = null; 232 233 234 protected short sid; 235 236 protected int idxLS; 237 241 protected short[] servers; 242 243 transient protected String serversFN = null; 244 247 private int[] stamp; 248 249 private byte[] stampbuf = null; 250 251 private int[] bootTS = null; 252 253 transient protected String bootTSFN = null; 254 255 256 protected String name; 257 258 protected String domain; 259 260 protected int port; 261 262 protected MessageVector qout; 263 264 269 public final String getName() { 270 return name; 271 } 272 273 278 public final String getDomainName() { 279 return domain; 280 } 281 282 287 public String toString() { 288 StringBuffer strbuf = new StringBuffer (); 289 strbuf.append("(").append(super.toString()); 290 strbuf.append(",name=").append(getName()); 291 if (qout != null) strbuf.append(",qout=").append(qout.size()); 292 if (servers != null) { 293 for (int i=0; i<servers.length; i++) { 294 strbuf.append(",(").append(servers[i]).append(','); 295 strbuf.append(stamp[i]).append(')'); 296 } 297 } 298 strbuf.append(")"); 299 300 return strbuf.toString(); 301 } 302 303 308 public Network() { 309 } 310 311 318 public void insert(Message msg) { 319 qout.insert(msg); 320 } 321 322 325 public void save() throws IOException {} 326 327 331 public void restore() throws Exception { 332 sid = AgentServer.getServerId(); 333 idxLS = index(sid); 334 stampbuf = AgentServer.getTransaction().loadByteArray(name); 336 if (stampbuf == null) { 337 stampbuf = new byte[4*servers.length]; 339 stamp = new int[servers.length]; 340 bootTS = new int[servers.length]; 341 for (int i=0; i<servers.length; i++) { 343 if (i != idxLS) { 344 stamp[i] = -1; 345 bootTS[i] = -1; 346 } else { 347 stamp[i] = 0; 348 bootTS[i] = (int) (System.currentTimeMillis() /1000L); 349 } 350 } 351 AgentServer.getTransaction().save(servers, serversFN); 353 AgentServer.getTransaction().save(bootTS, bootTSFN); 354 AgentServer.getTransaction().saveByteArray(stampbuf, name); 355 } else { 356 short[] s = (short[]) AgentServer.getTransaction().load(serversFN); 358 bootTS = (int[]) AgentServer.getTransaction().load(bootTSFN); 359 stamp = new int[s.length]; 360 for (int i=0; i<stamp.length; i++) { 361 stamp[i] = ((stampbuf[(i*4)+0] & 0xFF) << 24) + 362 ((stampbuf[(i*4)+1] & 0xFF) << 16) + 363 ((stampbuf[(i*4)+2] & 0xFF) << 8) + 364 (stampbuf[(i*4)+3] & 0xFF); 365 } 366 if ((servers != null) && !Arrays.equals(servers, s)) { 368 for (int i=0; i<servers.length; i++) 369 logmon.log(BasicLevel.DEBUG, 370 "servers[" + i + "]=" + servers[i]); 371 for (int i=0; i<s.length; i++) 372 logmon.log(BasicLevel.DEBUG, 373 "servers[" + i + "]=" + s[i]); 374 375 throw new IOException("Network configuration changed"); 376 } 377 } 378 } 379 380 393 public void init(String name, int port, short[] servers) throws Exception { 394 this.name = AgentServer.getName() + '.' + name; 395 396 qout = new MessageVector(this.name, 397 AgentServer.getTransaction().isPersistent()); 398 399 this.domain = name; 400 this.port = port; 401 402 logmon = Debug.getLogger(Debug.A3Network + '.' + name); 405 logmon.log(BasicLevel.DEBUG, name + ", initialized"); 406 407 WDActivationPeriod = Long.getLong("WDActivationPeriod", 408 WDActivationPeriod).longValue(); 409 WDActivationPeriod = Long.getLong(name + ".WDActivationPeriod", 410 WDActivationPeriod).longValue(); 411 412 WDNbRetryLevel1 = Integer.getInteger("WDNbRetryLevel1", 413 WDNbRetryLevel1).intValue(); 414 WDNbRetryLevel1 = Integer.getInteger(name + ".WDNbRetryLevel1", 415 WDNbRetryLevel1).intValue(); 416 417 WDRetryPeriod1 = Long.getLong("WDRetryPeriod1", 418 WDRetryPeriod1).longValue(); 419 WDRetryPeriod1 = Long.getLong(name + ".WDRetryPeriod1", 420 WDRetryPeriod1).longValue(); 421 422 WDNbRetryLevel2 = Integer.getInteger("WDNbRetryLevel2", 423 WDNbRetryLevel2).intValue(); 424 WDNbRetryLevel2 = Integer.getInteger(name + ".WDNbRetryLevel2", 425 WDNbRetryLevel2).intValue(); 426 427 WDRetryPeriod2 = Long.getLong("WDRetryPeriod2", 428 WDRetryPeriod2).longValue(); 429 WDRetryPeriod2 = Long.getLong(name + ".WDRetryPeriod2", 430 WDRetryPeriod2).longValue(); 431 432 WDRetryPeriod3 = Long.getLong("WDRetryPeriod3", 433 WDRetryPeriod3).longValue(); 434 WDRetryPeriod3 = Long.getLong(name + ".WDRetryPeriod3", 435 WDRetryPeriod3).longValue(); 436 437 Arrays.sort(servers); 439 440 this.servers = servers; 441 this.serversFN = name + "Servers"; 442 this.bootTSFN = name + "BootTS"; 443 444 restore(); 445 } 446 447 452 synchronized void addServer(short id) throws Exception { 453 int idx = index(id); 455 if (idx >= 0) return; 456 457 idx = -idx -1; 458 int[] newStamp = new int[servers.length+1]; 460 byte[] newStampBuf = new byte[4*(servers.length+1)]; 461 int[] newBootTS = new int[servers.length+1]; 462 short[] newServers = new short[servers.length+1]; 463 int j = 0; 465 for (int i=0; i<servers.length; i++) { 466 if (i == idx) j++; 467 newServers[j] = servers[i]; 468 newBootTS[j] = bootTS[i]; 469 newStamp[j] = stamp[i]; 470 j++; 471 } 472 if (idx > 0) 473 System.arraycopy(stampbuf, 0, newStampBuf, 0, idx*4); 474 if (idx < servers.length) 475 System.arraycopy(stampbuf, idx*4, 476 newStampBuf, (idx+1)*4, (servers.length-idx)*4); 477 478 newServers[idx] = id; 479 newBootTS[idx] = -1; 480 newStamp[idx] = -1; newStampBuf[idx] = 0; newStampBuf[idx+1] = 0; newStampBuf[idx+2] = 0; newStampBuf[idx+3] = 0; 486 stamp = newStamp; 487 stampbuf = newStampBuf; 488 servers = newServers; 489 bootTS = newBootTS; 490 idxLS = index(sid); 492 493 AgentServer.getTransaction().save(servers, serversFN); 495 AgentServer.getTransaction().save(bootTS, bootTSFN); 496 AgentServer.getTransaction().saveByteArray(stampbuf, name); 497 } 498 499 504 synchronized void delServer(short id) throws Exception { 505 int idx = index(id); 507 if (idx < 0) return; 508 509 int[] newStamp = new int[servers.length-1]; 510 byte[] newStampBuf = new byte[4*(servers.length-1)]; 511 int[] newBootTS = new int[servers.length-1]; 512 short[] newServers = new short[servers.length-1]; 513 514 int j = 0; 515 for (int i=0; i<servers.length; i++) { 516 if (id == servers[i]) { 517 idx = i; 518 continue; 519 } 520 newServers[j] = servers[i]; 521 newBootTS[j] = bootTS[i]; 522 newStamp[j] = stamp[i]; 523 j++; 524 } 525 if (idx > 0) 526 System.arraycopy(stampbuf, 0, newStampBuf, 0, idx*4); 527 if (idx < (servers.length-1)) 528 System.arraycopy(stampbuf, (idx+1)*4, 529 newStampBuf, idx*4, (servers.length-idx-1)*4); 530 531 stamp = newStamp; 532 stampbuf = newStampBuf; 533 servers = newServers; 534 bootTS = newBootTS; 535 idxLS = index(sid); 537 538 AgentServer.getTransaction().save(servers, serversFN); 540 AgentServer.getTransaction().save(bootTS, bootTSFN); 541 AgentServer.getTransaction().saveByteArray(stampbuf, name); 542 } 543 544 549 synchronized void resetServer(short id, int boot) throws IOException { 550 int idx = index(id); 552 if (idx < 0) return; 553 554 556 AgentServer.getTransaction().save(servers, serversFN); 558 AgentServer.getTransaction().save(bootTS, bootTSFN); 559 AgentServer.getTransaction().saveByteArray(stampbuf, name); 560 } 561 562 567 public void post(Message msg) throws Exception { 568 if ((msg.not.expiration > 0) && 569 (msg.not.expiration < System.currentTimeMillis())) { 570 if (logmon.isLoggable(BasicLevel.DEBUG)) 571 logmon.log(BasicLevel.DEBUG, 572 getName() + ": removes expired notification " + 573 msg.from + ", " + msg.not); 574 return; 575 } 576 577 short to = AgentServer.getServerDesc(msg.to.to).gateway; 578 581 msg.source = AgentServer.getServerId(); 582 msg.dest = to; 583 msg.stamp = getSendUpdate(to); 584 585 msg.save(); 587 qout.push(msg); 589 } 590 591 597 protected final int index(short id) { 598 int idx = Arrays.binarySearch(servers, id); 599 return idx; 600 } 601 602 protected final byte[] getStamp() { 603 return stampbuf; 604 } 605 606 protected final void setStamp(byte[] stampbuf) { 607 this.stampbuf = stampbuf; 608 stamp = new int[servers.length]; 609 for (int i=0; i<stamp.length; i++) { 610 stamp[i] = ((stampbuf[(i*4)+0] & 0xFF) << 24) + 611 ((stampbuf[(i*4)+1] & 0xFF) << 16) + 612 ((stampbuf[(i*4)+2] & 0xFF) << 8) + 613 (stampbuf[(i*4)+3] & 0xFF); 614 } 615 } 616 617 private void updateStamp(int idx, int update) throws IOException { 618 stamp[idx] = update; 619 stampbuf[(idx*4)+0] = (byte)((update >>> 24) & 0xFF); 620 stampbuf[(idx*4)+1] = (byte)((update >>> 16) & 0xFF); 621 stampbuf[(idx*4)+2] = (byte)((update >>> 8) & 0xFF); 622 stampbuf[(idx*4)+3] = (byte)(update & 0xFF); 623 AgentServer.getTransaction().saveByteArray(stampbuf, name); 624 } 625 626 627 static final int DELIVER = 0; 628 634 static final int ALREADY_DELIVERED = 2; 635 636 649 private synchronized int testRecvUpdate(short source, int update) throws IOException { 650 int fromIdx = index(source); 651 652 if (update > stamp[fromIdx]) { 653 updateStamp(fromIdx, update); 654 return DELIVER; 655 } 656 return ALREADY_DELIVERED; 657 } 658 659 666 private synchronized int getSendUpdate(short to) throws IOException { 667 int update = stamp[idxLS] +1; 668 updateStamp(idxLS, update); 669 return update; 670 } 671 672 final int getBootTS() { 673 return bootTS[idxLS]; 674 } 675 676 final void testBootTS(short source, int boot) throws IOException { 677 int fromIdx = index(source); 678 679 if (boot != bootTS[fromIdx]) { 680 if (logmon.isLoggable(BasicLevel.WARN)) 681 logmon.log(BasicLevel.WARN, 682 getName() + ", reset stamp #" + source + ", " 683 + bootTS[fromIdx] + " -> " + boot); 684 685 bootTS[fromIdx] = boot; 686 AgentServer.getTransaction().save(bootTS, bootTSFN); 687 updateStamp(fromIdx, -1); 688 } 689 } 690 691 693 698 protected void deliver(Message msg) throws Exception { 699 short source = msg.getSource(); 701 702 short dest = msg.getDest(); 705 if (dest != AgentServer.getServerId()) { 706 logmon.log(BasicLevel.ERROR, 707 getName() + ", recv bad msg#" + msg.getStamp() + 708 " really to " + dest + 709 " by " + source); 710 throw new Exception ("recv bad msg#" + msg.getStamp() + 711 " really to " + dest + 712 " by " + source); 713 } 714 715 720 if (logmon.isLoggable(BasicLevel.DEBUG)) 721 logmon.log(BasicLevel.DEBUG, 722 getName() + ", recv msg#" + msg.getStamp() + 723 " from " + msg.from + 724 " to " + msg.to + 725 " by " + source); 726 727 ServerDesc desc = AgentServer.getServerDesc(source); 728 if (! desc.active) { 729 desc.active = true; 730 desc.retry = 0; 731 } 732 733 AgentServer.getTransaction().begin(); 736 737 int todo = testRecvUpdate(source, msg.getStamp()); 740 741 if (todo == DELIVER) { 742 Channel.post(msg); 746 747 if (logmon.isLoggable(BasicLevel.DEBUG)) 748 logmon.log(BasicLevel.DEBUG, 749 getName() + ", deliver msg#" + msg.getStamp()); 750 751 Channel.save(); 752 AgentServer.getTransaction().commit(); 753 Channel.validate(); 755 AgentServer.getTransaction().release(); 756 } else { 757 AgentServer.getTransaction().commit(); 760 AgentServer.getTransaction().release(); 761 } 762 } 763 764 772 public void delete() throws IllegalStateException { 773 if (isRunning()) throw new IllegalStateException (); 774 775 AgentServer.getTransaction().delete(serversFN); 776 AgentServer.getTransaction().delete(bootTSFN); 777 AgentServer.getTransaction().delete(name); 778 } 779 780 783 public void validate() { 784 qout.validate(); 785 } 786 787 public MessageQueue getQueue() { 788 return qout; 789 } 790 791 794 public void setPort(int port) { 795 this.port = port; 796 } 797 798 public final int getPort() { 799 return port; 800 } 801 } 802 | Popular Tags |