1 22 package fr.dyade.aaa.agent; 23 24 import java.io.*; 25 import java.net.*; 26 import java.util.*; 27 28 import org.objectweb.util.monolog.api.BasicLevel; 29 import org.objectweb.util.monolog.api.Logger; 30 31 import fr.dyade.aaa.util.*; 32 33 37 public class PoolNetwork extends StreamNetwork { 38 39 WakeOnConnection wakeOnConnection = null; 40 41 NetSession sessions[] = null; 42 43 Dispatcher dispatcher = null; 44 45 WatchDog watchDog = null; 46 47 static int nbMaxCnx; 48 int nbActiveCnx = 0; 49 NetSession activeSessions[]; 50 long current = 0L; 51 52 55 public PoolNetwork() throws Exception { 56 super(); 57 } 58 59 72 public void init(String name, int port, short[] servers) throws Exception { 73 super.init(name, port, servers); 74 75 sessions = new NetSession[servers.length]; 77 for (int i=0; i<sessions.length; i++) { 78 if (servers[i] != AgentServer.getServerId()) 79 sessions[i] = new NetSession(getName(), servers[i]); 80 } 81 wakeOnConnection = new WakeOnConnection(getName(), logmon); 82 dispatcher = new Dispatcher(getName(), logmon); 83 watchDog = new WatchDog(getName(), logmon); 84 } 85 86 89 public void start() throws Exception { 90 logmon.log(BasicLevel.DEBUG, getName() + ", starting"); 91 try { 92 nbMaxCnx = AgentServer.getInteger(getName() + ".nbMaxCnx").intValue(); 93 } catch (Exception exc) { 94 try { 95 nbMaxCnx = AgentServer.getInteger("PoolNetwork.nbMaxCnx").intValue(); 96 } catch (Exception exc2) { 97 nbMaxCnx = 5; 98 } 99 } 100 101 try { 102 if (isRunning()) 103 throw new IOException("Consumer already running."); 104 105 for (int i=0; i<sessions.length; i++) { 106 if (sessions[i] != null) sessions[i].init(); 107 } 108 109 activeSessions = new NetSession[nbMaxCnx]; 110 111 wakeOnConnection.start(); 112 dispatcher.start(); 113 watchDog.start(); 114 } catch (Exception exc) { 115 logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc); 116 throw exc; 117 } 118 logmon.log(BasicLevel.DEBUG, getName() + ", started"); 119 } 120 121 124 public void wakeup() { 125 if (watchDog != null) watchDog.wakeup(); 126 } 127 128 131 public void stop() { 132 if (wakeOnConnection != null) wakeOnConnection.stop(); 133 if (dispatcher != null) dispatcher.stop(); 134 if (watchDog != null) watchDog.stop(); 135 for (int i=0; i<sessions.length; i++) { 136 if (sessions[i]!= null) sessions[i].stop(); 138 } 139 logmon.log(BasicLevel.DEBUG, getName() + ", stopped"); 140 } 141 142 148 public boolean isRunning() { 149 if ((wakeOnConnection != null) && wakeOnConnection.isRunning() && 150 (dispatcher != null) && dispatcher.isRunning() && 151 (watchDog != null) && watchDog.isRunning()) 152 return true; 153 154 return false; 155 } 156 157 final NetSession getSession(short sid) { 158 return sessions[index(sid)]; 159 } 160 161 167 public String toString() { 168 StringBuffer strbuf = new StringBuffer (); 169 170 strbuf.append(super.toString()).append("\n\t"); 171 if (wakeOnConnection != null) 172 strbuf.append(wakeOnConnection.toString()).append("\n\t"); 173 if (dispatcher != null) 174 strbuf.append(dispatcher.toString()).append("\n\t"); 175 if (watchDog != null) 176 strbuf.append(watchDog.toString()).append("\n\t"); 177 for (int i=0; i<sessions.length; i++) { 178 if (sessions[i]!= null) 180 strbuf.append(sessions[i].toString()).append("\n\t"); 181 } 182 183 return strbuf.toString(); 184 } 185 186 final class MessageVector extends Vector { 187 public synchronized Message removeMessage(int stamp) { 188 Message msg = null; 189 190 modCount++; 191 for (int index=0 ; index<elementCount ; index++) { 192 try { 193 msg = (Message) elementData[index]; 194 } catch (ClassCastException exc) { 195 continue; 196 } 197 if (msg.getStamp() == stamp) { 198 int j = elementCount - index - 1; 199 if (j > 0) { 200 System.arraycopy(elementData, index + 1, elementData, index, j); 201 } 202 elementCount--; 203 elementData[elementCount] = null; 204 205 return msg; 206 } 207 } 208 throw new NoSuchElementException(); 209 } 210 } 211 212 final class NetSession implements Runnable { 213 214 private short sid; 215 221 private volatile boolean running = false; 222 226 private boolean canStop = false; 227 228 private Thread thread = null; 229 230 private String name = null; 231 232 239 private boolean local = false; 240 241 244 private ServerDesc server; 245 246 private Socket sock = null; 247 248 MessageInputStream nis = null; 249 MessageOutputStream nos = null; 250 251 252 private MessageVector sendList; 253 254 private long last = 0L; 255 256 NetSession(String name, short sid) { 257 this.sid = sid; 258 this.name = name + ".netSession#" + sid; 259 260 if (logmon.isLoggable(BasicLevel.DEBUG)) 261 logmon.log(BasicLevel.DEBUG, getName() + ", created"); 262 263 running = false; 264 canStop = false; 265 thread = null; 266 267 sendList = new MessageVector(); 268 } 269 270 void init() throws UnknownServerException { 271 server = AgentServer.getServerDesc(sid); 272 } 273 274 279 public final String getName() { 280 return name; 281 } 282 283 void start() { 284 if (logmon.isLoggable(BasicLevel.DEBUG)) 285 logmon.log(BasicLevel.DEBUG, getName() + ", started"); 286 287 long currentTimeMillis = System.currentTimeMillis(); 288 289 if (((server.retry < WDNbRetryLevel1) && 290 ((server.last + WDRetryPeriod1) < currentTimeMillis)) || 291 ((server.retry < WDNbRetryLevel2) && 292 ((server.last + WDRetryPeriod2) < currentTimeMillis)) || 293 ((server.last + WDRetryPeriod3) < currentTimeMillis)) { 294 if (localStart()) { 295 startEnd(); 296 } else { 297 server.last = currentTimeMillis; 298 server.retry += 1; 299 } 300 } 301 } 302 303 void start(Socket sock, int boot) { 304 if (logmon.isLoggable(BasicLevel.DEBUG)) 305 logmon.log(BasicLevel.DEBUG, getName() + ", remotely started"); 306 307 if (remoteStart(sock, boot)) startEnd(); 308 } 309 310 330 boolean localStart() { 331 synchronized (this) { 332 if ((this.sock != null) || this.local) { 333 if (logmon.isLoggable(BasicLevel.WARN)) 337 logmon.log(BasicLevel.WARN, getName() + ", connection refused"); 338 return false; 339 } 340 341 this.local = true; 343 } 344 345 Socket sock = null; 346 try { 347 sock = createSocket(server); 348 setSocketOption(sock); 349 350 writeBoot(sock.getOutputStream()); 351 int boot = readAck(sock.getInputStream()); 352 353 AgentServer.getTransaction().begin(); 354 testBootTS(sid, boot); 355 AgentServer.getTransaction().commit(); 356 AgentServer.getTransaction().release(); 357 358 nis = new MessageInputStream(sock.getInputStream()); 359 nos = new MessageOutputStream(sock.getOutputStream()); 360 } catch (Exception exc) { 361 if (logmon.isLoggable(BasicLevel.WARN)) 362 logmon.log(BasicLevel.WARN, 363 getName() + ", connection refused.", exc); 364 try { 366 sock.getOutputStream().close(); 367 } catch (Exception exc2) {} 368 try { 369 sock.getInputStream().close(); 370 } catch (Exception exc2) {} 371 try { 372 sock.close(); 373 } catch (Exception exc2) {} 374 375 this.local = false; 377 nis = null; 378 nos = null; 379 380 return false; 381 } 382 383 this.sock = sock; 389 this.local = false; 390 391 return true; 392 } 393 394 407 synchronized boolean remoteStart(Socket sock, int boot) { 408 try { 409 if ((this.sock != null) || 410 (this.local && server.sid > AgentServer.getServerId())) 411 throw new ConnectException("Already connected"); 417 418 if (logmon.isLoggable(BasicLevel.DEBUG)) 420 logmon.log(BasicLevel.DEBUG, getName() + ", send AckStatus"); 421 422 writeAck(sock.getOutputStream()); 423 424 AgentServer.getTransaction().begin(); 425 testBootTS(sid, boot); 426 AgentServer.getTransaction().commit(); 427 AgentServer.getTransaction().release(); 428 429 nis = new MessageInputStream(sock.getInputStream()); 430 nos = new MessageOutputStream(sock.getOutputStream()); 431 432 this.sock = sock; 434 435 return true; 436 } catch (Exception exc) { 437 if (logmon.isLoggable(BasicLevel.WARN)) 439 logmon.log(BasicLevel.WARN, getName() + ", connection refused", exc); 440 441 try { 443 sock.getOutputStream().close(); 444 } catch (Exception exc2) {} 445 try { 446 sock.getInputStream().close(); 447 } catch (Exception exc2) {} 448 try { 449 sock.close(); 450 } catch (Exception exc2) {} 451 nis = null; 452 nos = null; 453 } 454 return false; 455 } 456 457 462 private void startEnd() { 463 server.active = true; 464 server.retry = 0; 465 466 synchronized(activeSessions) { 467 if (nbActiveCnx < nbMaxCnx) { 468 activeSessions[nbActiveCnx++] = this; 470 } else { 471 long min = Long.MAX_VALUE; 473 int idx = -1; 474 for (int i=0; i<nbMaxCnx; i++) { 475 if (activeSessions[i].last < min) { 476 idx = i; 477 min = activeSessions[i].last; 478 } 479 } 480 activeSessions[idx].stop(); 482 activeSessions[idx] = this; 483 } 484 last = current++; 485 } 486 thread = new Thread (this, getName()); 487 thread.setDaemon(false); 488 489 running = true; 490 canStop = true; 491 thread.start(); 492 493 if (logmon.isLoggable(BasicLevel.DEBUG)) 494 logmon.log(BasicLevel.DEBUG, getName() + ", connection started"); 495 496 Object [] waiting = sendList.toArray(); 505 logmon.log(BasicLevel.DEBUG, 506 getName() + ", send " + waiting.length + " waiting messages"); 507 508 Message msg = null; 509 long currentTimeMillis = System.currentTimeMillis(); 510 for (int i=0; i<waiting.length; i++) { 511 msg = (Message) waiting[i]; 512 if ((msg.not != null) && 513 (msg.not.expiration > 0) && 514 (msg.not.expiration < currentTimeMillis)) { 515 if (logmon.isLoggable(BasicLevel.DEBUG)) 516 logmon.log(BasicLevel.DEBUG, 517 getName() + ": removes expired notification " + 518 msg.from + ", " + msg.not); 519 try { 520 doAck(msg.getStamp()); 521 } catch (IOException exc) { 522 logmon.log(BasicLevel.ERROR, 523 getName() + ": cannot removes expired notification " + 524 msg.from + ", " + msg.not, exc); 525 } 526 } else { 527 transmit(msg, currentTimeMillis); 528 } 529 } 530 } 531 532 535 void stop() { 536 running = false; 537 538 if (logmon.isLoggable(BasicLevel.DEBUG)) 539 logmon.log(BasicLevel.DEBUG, getName() + ", stopped."); 540 541 while ((thread != null) && thread.isAlive()) { 542 if (canStop) { 543 if (thread.isAlive()) thread.interrupt(); 544 shutdown(); 545 } 546 try { 547 thread.join(1000L); 548 } catch (InterruptedException exc) { 549 continue; 550 } 551 thread = null; 552 } 553 } 554 555 public void shutdown() { 556 close(); 557 } 558 559 synchronized void close() { 560 if (logmon.isLoggable(BasicLevel.DEBUG)) 561 logmon.log(BasicLevel.DEBUG, getName() + ", closed."); 562 563 try { 564 sock.getInputStream().close(); 565 } catch (Exception exc) {} 566 try { 567 sock.getOutputStream().close(); 568 } catch (Exception exc) {} 569 try { 570 sock.close(); 571 } catch (Exception exc) {} 572 sock = null; 573 574 nis = null; 575 nos = null; 576 } 577 578 585 final private void doAck(int ack) throws IOException { 586 Message msg = null; 587 588 if (logmon.isLoggable(BasicLevel.DEBUG)) 589 logmon.log(BasicLevel.DEBUG, getName() + ", ack received #" + ack); 590 591 try { 592 msg = sendList.removeMessage(ack); 595 AgentServer.getTransaction().begin(); 596 msg.delete(); 597 msg.free(); 598 AgentServer.getTransaction().commit(); 599 AgentServer.getTransaction().release(); 600 601 if (logmon.isLoggable(BasicLevel.DEBUG)) 602 logmon.log(BasicLevel.DEBUG, 603 getName() + ", remove msg#" + msg.getStamp()); 604 } catch (NoSuchElementException exc) { 605 logmon.log(BasicLevel.WARN, 606 getName() + ", can't ack, unknown msg#" + ack); 607 } 608 } 609 610 614 final void send(Message msg) { 615 if (logmon.isLoggable(BasicLevel.DEBUG)) { 616 if (msg.not != null) { 617 logmon.log(BasicLevel.DEBUG, 618 getName() + ", send msg#" + msg.getStamp()); 619 } else { 620 logmon.log(BasicLevel.DEBUG, 621 getName() + ", send ack#" + msg.getStamp()); 622 } 623 } 624 625 long currentTimeMillis = System.currentTimeMillis(); 626 627 if (msg.not != null) { 628 sendList.addElement(msg); 629 630 if ((msg.not.expiration > 0) && 631 (msg.not.expiration < currentTimeMillis)) { 632 if (logmon.isLoggable(BasicLevel.DEBUG)) 633 logmon.log(BasicLevel.DEBUG, 634 getName() + ": removes expired notification " + 635 msg.from + ", " + msg.not); 636 try { 637 doAck(msg.getStamp()); 638 } catch (IOException exc) { 639 logmon.log(BasicLevel.ERROR, 640 getName() + ": cannot removes expired notification " + 641 msg.from + ", " + msg.not, exc); 642 } 643 return; 644 } 645 } 646 647 if (sock == null) { 648 start(); 651 } else { 652 transmit(msg, currentTimeMillis); 653 } 654 } 655 656 final private void ack(int stamp) throws Exception { 657 if (logmon.isLoggable(BasicLevel.DEBUG)) 658 logmon.log(BasicLevel.DEBUG, 659 getName() + ", set ack msg#" + stamp); 660 661 Message ack = Message.alloc(AgentId.localId, 662 AgentId.localId(server.sid), 663 null); 664 ack.source = AgentServer.getServerId(); 665 ack.dest = AgentServer.getServerDesc(server.sid).gateway; 666 ack.stamp = stamp; 667 668 qout.push(ack); 669 } 670 671 final private synchronized void transmit(Message msg, 672 long currentTimeMillis) { 673 last = current++; 674 try { 675 nos.writeMessage(msg, currentTimeMillis); 676 } catch (IOException exc) { 677 logmon.log(BasicLevel.ERROR, 678 getName() + ", exception in sending message", exc); 679 close(); 680 } catch (NullPointerException exc) { 681 } 683 } 684 685 public void run() { 686 Message msg; 687 688 try { 689 while (running) { 690 canStop = true; 691 692 if (logmon.isLoggable(BasicLevel.DEBUG)) 693 logmon.log(BasicLevel.DEBUG, getName() + ", waiting message"); 694 695 try { 696 msg = nis.readMessage(); 697 } catch (ClassNotFoundException exc) { 698 logmon.log(BasicLevel.ERROR, 701 getName() + ", error during waiting message", exc); 702 continue; 703 } catch (InvalidClassException exc) { 704 logmon.log(BasicLevel.ERROR, 707 getName() + ", error during waiting message", exc); 708 continue; 709 } catch (StreamCorruptedException exc) { 710 logmon.log(BasicLevel.ERROR, 711 getName() + ", error during waiting message", exc); 712 break; 713 } catch (OptionalDataException exc) { 714 logmon.log(BasicLevel.ERROR, 715 getName() + ", error during waiting message", exc); 716 break; 717 } catch (NullPointerException exc) { 718 break; 720 } 721 722 canStop = false; 723 724 if (logmon.isLoggable(BasicLevel.DEBUG)) 725 logmon.log(BasicLevel.DEBUG, getName() + ", receives: " + msg); 726 727 int stamp = msg.getStamp(); 730 if (msg.not != null) { 731 deliver(msg); 732 ack(stamp); 733 } else { 734 doAck(stamp); 735 } 736 } 737 } catch (EOFException exc) { 738 if (running) 739 logmon.log(BasicLevel.WARN, 740 this.getName() + ", connection closed", exc); 741 } catch (SocketException exc) { 742 if (running) 743 logmon.log(BasicLevel.WARN, 744 this.getName() + ", connection closed", exc); 745 } catch (Exception exc) { 746 logmon.log(BasicLevel.ERROR, getName() + ", exited", exc); 747 } finally { 748 logmon.log(BasicLevel.DEBUG, getName() + ", ends"); 749 running = false; 750 close(); 751 } 752 } 753 754 757 final class MessageInputStream extends ByteArrayInputStream { 758 private InputStream is = null; 759 760 MessageInputStream(InputStream is) { 761 super(new byte[256]); 762 this.is = is; 763 } 764 765 private void readFully(int length) throws IOException { 766 count = 0; 767 if (length > buf.length) buf = new byte[length]; 768 769 int nb = -1; 770 do { 771 nb = is.read(buf, count, length-count); 772 if (logmon.isLoggable(BasicLevel.DEBUG)) 773 logmon.log(BasicLevel.DEBUG, getName() + ", reads:" + nb); 774 if (nb < 0) throw new EOFException(); 775 count += nb; 776 } while (count != length); 777 pos = 0; 778 } 779 780 Message readMessage() throws Exception { 781 count = 0; 782 readFully(Message.LENGTH +4 -1); 783 int length = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) + 785 ((buf[2] & 0xFF) << 8) + ((buf[3] & 0xFF) << 0); 786 787 Message msg = Message.alloc(); 788 int idx = msg.readFromBuf(buf, 4); 789 790 if (length > idx) { 791 readFully(length - idx); 793 794 boolean persistent = ((buf[0] & Message.PERSISTENT) == 0)?false:true; 796 boolean detachable = ((buf[0] & Message.DETACHABLE) == 0)?false:true; 797 798 pos = 1; 799 ObjectInputStream ois = new ObjectInputStream(this); 801 msg.not = (Notification) ois.readObject(); 802 if (msg.not.expiration > 0) 803 msg.not.expiration += System.currentTimeMillis(); 804 msg.not.persistent = persistent; 805 msg.not.detachable = detachable; 806 msg.not.detached = false; 807 } else { 808 msg.not = null; 809 } 810 811 return msg; 812 } 813 } 814 815 818 final class MessageOutputStream extends ByteArrayOutputStream { 819 private OutputStream os = null; 820 private ObjectOutputStream oos = null; 821 822 MessageOutputStream(OutputStream os) throws IOException { 823 super(256); 824 825 this.os = os; 826 oos = new ObjectOutputStream(this); 827 count = 0; 828 buf[Message.LENGTH +4] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF); 829 buf[Message.LENGTH +5] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF); 830 buf[Message.LENGTH +6] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF); 831 buf[Message.LENGTH +7] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF); 832 } 833 834 void writeMessage(Message msg, 835 long currentTimeMillis) throws IOException { 836 logmon.log(BasicLevel.DEBUG, getName() + ", sends " + msg); 837 838 int idx = msg.writeToBuf(buf, 4); 839 count = Message.LENGTH +4 -1; 842 843 try { 844 if (msg.not != null) { 845 buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) | 847 (msg.not.detachable?Message.DETACHABLE:0)); 848 849 count = Message.LENGTH +8; 851 852 if (msg.not.expiration > 0) 853 msg.not.expiration -= currentTimeMillis; 854 oos.writeObject(msg.not); 855 856 oos.reset(); 857 oos.flush(); 858 } 859 860 buf[0] = (byte) (count >>> 24); 862 buf[1] = (byte) (count >>> 16); 863 buf[2] = (byte) (count >>> 8); 864 buf[3] = (byte) (count >>> 0); 865 866 os.write(buf, 0, count);; 867 os.flush(); 868 } finally { 869 if ((msg.not != null) && (msg.not.expiration > 0)) 870 msg.not.expiration += currentTimeMillis; 871 count = 0; 872 } 873 } 874 } 875 } 876 877 final class WakeOnConnection extends Daemon { 878 ServerSocket listen = null; 879 880 WakeOnConnection(String name, Logger logmon) throws IOException { 881 super(name + ".wakeOnConnection"); 882 listen = createServerSocket(); 884 this.logmon = logmon; 886 } 887 888 protected void close() { 889 try { 890 listen.close(); 891 } catch (Exception exc) {} 892 listen = null; 893 } 894 895 protected void shutdown() { 896 close(); 897 } 898 899 902 public void run() { 903 904 Socket sock = null; 905 906 try { 907 while (running) { 908 try { 909 canStop = true; 910 911 try { 913 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 914 this.logmon.log(BasicLevel.DEBUG, 915 this.getName() + ", waiting connection"); 916 917 sock = listen.accept(); 918 } catch (IOException exc) { 919 if (running) 920 this.logmon.log(BasicLevel.ERROR, 921 this.getName() + 922 ", error during waiting connection", exc); 923 continue; 924 } 925 canStop = false; 926 927 setSocketOption(sock); 928 929 Boot boot = readBoot(sock.getInputStream()); 930 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 931 this.logmon.log(BasicLevel.DEBUG, 932 this.getName() + ", connection setup from #" + 933 boot.sid); 934 getSession(boot.sid).start(sock, boot.boot); 935 } catch (Exception exc) { 936 this.logmon.log(BasicLevel.ERROR, 937 this.getName() + ", bad connection setup", exc); 938 } 939 } 940 } finally { 941 finish(); 942 } 943 } 944 } 945 946 final class Dispatcher extends Daemon { 947 Dispatcher(String name, Logger logmon) { 948 super(name + ".dispatcher"); 949 this.logmon = logmon; 951 } 952 953 protected void close() {} 954 955 protected void shutdown() {} 956 957 public void run() { 958 Message msg = null; 959 960 try { 961 while (running) { 962 canStop = true; 963 964 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 965 this.logmon.log(BasicLevel.DEBUG, 966 this.getName() + ", waiting message"); 967 try { 968 msg = qout.get(); 969 } catch (InterruptedException exc) { 970 continue; 971 } 972 canStop = false; 973 if (! running) break; 974 975 getSession(msg.getDest()).send(msg); 977 qout.pop(); 978 } 979 } finally { 980 finish(); 981 } 982 } 983 } 984 985 final class WatchDog extends Daemon { 986 987 private Object lock; 988 989 WatchDog(String name, Logger logmon) { 990 super(name + ".watchdog"); 991 lock = new Object (); 992 this.logmon = logmon; 994 } 995 996 protected void close() {} 997 998 protected void shutdown() { 999 wakeup(); 1000 } 1001 1002 void wakeup() { 1003 synchronized (lock) { 1004 lock.notify(); 1005 } 1006 } 1007 1008 public void run() { 1009 try { 1010 synchronized (lock) { 1011 while (running) { 1012 try { 1013 lock.wait(WDActivationPeriod); 1014 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 1015 this.logmon.log(BasicLevel.DEBUG, 1016 this.getName() + ", activated"); 1017 } catch (InterruptedException exc) { 1018 continue; 1019 } 1020 1021 if (! running) break; 1022 1023 for (int sid=0; sid<sessions.length; sid++) { 1024 if ((sessions[sid] != null) && 1025 (sessions[sid].sendList.size() > 0) && 1026 (! sessions[sid].running)) { 1027 sessions[sid].start(); 1028 } 1029 } 1030 } 1031 } 1032 } finally { 1033 finish(); 1034 } 1035 } 1036 } 1037 1038 1039 final void writeBoot(OutputStream out) throws IOException { 1040 if (logmon.isLoggable(BasicLevel.DEBUG)) 1041 logmon.log(BasicLevel.DEBUG, getName() + ", writeBoot: " + getBootTS()); 1042 1043 byte[] iobuf = new byte[6]; 1044 iobuf[0] = (byte) (AgentServer.getServerId() >>> 8); 1045 iobuf[1] = (byte) (AgentServer.getServerId() >>> 0); 1046 iobuf[2] = (byte) (getBootTS() >>> 24); 1047 iobuf[3] = (byte) (getBootTS() >>> 16); 1048 iobuf[4] = (byte) (getBootTS() >>> 8); 1049 iobuf[5] = (byte) (getBootTS() >>> 0); 1050 out.write(iobuf); 1051 out.flush(); 1052 } 1053 1054 final class Boot { 1055 transient short sid; 1056 transient int boot; 1057 } 1058 1059 final void readFully(InputStream is, byte[] iobuf) throws IOException { 1060 int n = 0; 1061 do { 1062 int count = is.read(iobuf, n, iobuf.length - n); 1063 if (count < 0) throw new EOFException(); 1064 n += count; 1065 } while (n < iobuf.length); 1066 } 1067 1068 final Boot readBoot(InputStream in) throws IOException { 1069 Boot boot = new Boot(); 1070 1071 byte[] iobuf = new byte[6]; 1072 readFully(in, iobuf); 1073 boot.sid = (short) (((iobuf[0] & 0xFF) << 8) + (iobuf[1] & 0xFF)); 1074 boot.boot = ((iobuf[2] & 0xFF) << 24) + ((iobuf[3] & 0xFF) << 16) + 1075 ((iobuf[4] & 0xFF) << 8) + ((iobuf[5] & 0xFF) << 0); 1076 1077 if (logmon.isLoggable(BasicLevel.DEBUG)) 1078 logmon.log(BasicLevel.DEBUG, getName() + ", readBoot from #" + boot.sid + 1079 " -> " + boot.boot); 1080 1081 return boot; 1082 } 1083 1084 final void writeAck(OutputStream out) throws IOException { 1085 if (logmon.isLoggable(BasicLevel.DEBUG)) 1086 logmon.log(BasicLevel.DEBUG, getName() + ", writeAck: " + getBootTS()); 1087 1088 byte[] iobuf = new byte[4]; 1089 iobuf[0] = (byte) (getBootTS() >>> 24); 1090 iobuf[1] = (byte) (getBootTS() >>> 16); 1091 iobuf[2] = (byte) (getBootTS() >>> 8); 1092 iobuf[3] = (byte) (getBootTS() >>> 0); 1093 out.write(iobuf); 1094 out.flush(); 1095 } 1096 1097 final int readAck(InputStream in)throws IOException { 1098 byte[] iobuf = new byte[4]; 1099 readFully(in, iobuf); 1100 int boot = ((iobuf[0] & 0xFF) << 24) + ((iobuf[1] & 0xFF) << 16) + 1101 ((iobuf[2] & 0xFF) << 8) + ((iobuf[3] & 0xFF) << 0); 1102 1103 if (logmon.isLoggable(BasicLevel.DEBUG)) 1104 logmon.log(BasicLevel.DEBUG, getName() + ", readAck:" + boot); 1105 1106 return boot; 1107 } 1108} 1109 | Popular Tags |