1 19 package fr.dyade.aaa.agent; 20 21 import java.io.*; 22 import java.net.*; 23 import java.util.*; 24 25 import java.nio.*; 26 import java.nio.channels.*; 27 28 import org.objectweb.util.monolog.api.BasicLevel; 29 import org.objectweb.util.monolog.api.Logger; 30 31 import fr.dyade.aaa.util.Daemon; 32 33 37 public class NGNetwork extends StreamNetwork { 38 final static int Kb = 1024; 39 final static int Mb = 1024 * Kb; 40 41 final static int SO_BUFSIZE = 64 * Kb; 42 43 Selector selector = null; 44 45 Dispatcher dispatcher = null; 46 NetServer dmon[] = null; 47 48 final static int NbNetServer = 1; 50 51 CnxHandler[] handlers = null; 52 53 56 public NGNetwork() { 57 super(); 58 } 59 60 73 public void init(String name, int port, short[] servers) throws Exception { 74 super.init(name, port, servers); 75 76 handlers = new CnxHandler[servers.length]; 78 for (int i=0; i<servers.length; i++) { 79 if (servers[i] != AgentServer.getServerId()) 80 handlers[i] = new CnxHandler(getName(), servers[i]); 81 } 82 } 83 84 ServerSocketChannel listen = null; 85 86 void open() throws IOException { 87 listen = ServerSocketChannel.open(); 89 listen.configureBlocking(false); 90 listen.socket().bind(new InetSocketAddress(port)); 91 92 listen.register(selector, SelectionKey.OP_ACCEPT); 94 } 95 96 void close() { 97 try { 98 listen.close(); 99 } catch (Exception exc) {} 100 listen = null; 101 } 102 103 106 public void start() throws Exception { 107 try { 108 logmon.log(BasicLevel.DEBUG, getName() + ", starting"); 109 110 selector = Selector.open(); 112 for (int i=0; i<handlers.length; i++) { 114 if (handlers[i] != null) handlers[i].init(); 115 } 116 open(); 117 118 if (dispatcher == null) 119 dispatcher = new Dispatcher(getName(), logmon); 120 121 if (dmon == null) { 122 dmon = new NetServer[NbNetServer]; 123 for (int i=0; i<NbNetServer; i++) { 124 dmon[i] = new NetServer(getName(), logmon); 125 } 126 } 127 128 if (! dispatcher.isRunning()) dispatcher.start(); 129 for (int i=0; i<NbNetServer; i++) { 130 if (! dmon[i].isRunning()) dmon[i].start(); 131 } 132 } catch (IOException exc) { 133 logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc); 134 throw exc; 135 } 136 logmon.log(BasicLevel.DEBUG, getName() + ", started"); 137 } 138 139 final CnxHandler getHandler(short sid) { 140 return handlers[index(sid)]; 141 } 142 143 161 164 public void wakeup() { 165 if (selector != null) selector.wakeup(); 166 logmon.log(BasicLevel.DEBUG, getName() + ", wakeup"); 167 } 168 169 172 public void stop() { 173 if (dispatcher != null) dispatcher.stop(); 174 if (dmon != null) { 175 for (int i=0; i<NbNetServer; i++) { 176 if (dmon[i] != null) dmon[i].stop(); 177 } 178 } 179 close(); 180 logmon.log(BasicLevel.DEBUG, getName() + ", stopped"); 181 } 182 183 189 public boolean isRunning() { 190 if ((dispatcher == null) || ! dispatcher.isRunning()) 191 return false; 192 193 if (dmon == null) 194 return false; 195 196 for (int i=0; i<NbNetServer; i++) { 197 if ((dmon[i] == null) || ! dmon[i].isRunning()) 198 return false; 199 } 200 201 return true; 202 } 203 204 210 public String toString() { 211 StringBuffer strbuf = new StringBuffer (); 212 213 strbuf.append(super.toString()).append("\n\t"); 214 if (dispatcher != null) 215 strbuf.append(dispatcher.toString()).append("\n"); 216 for (int i=0; i<NbNetServer; i++) { 217 if ((dmon != null) && (dmon[i] != null)) 218 strbuf.append(dmon[i].toString()).append("\n"); 219 } 220 221 return strbuf.toString(); 222 } 223 224 void cnxStart(SocketChannel channel) throws IOException { 225 if (logmon.isLoggable(BasicLevel.DEBUG)) 226 logmon.log(BasicLevel.DEBUG, getName() + ", remotely started"); 227 228 channel.socket().setSendBufferSize(SO_BUFSIZE); 229 channel.socket().setReceiveBufferSize(SO_BUFSIZE); 230 if (logmon.isLoggable(BasicLevel.DEBUG)) 231 logmon.log(BasicLevel.DEBUG, getName() + " bufsize: " + 232 channel.socket().getReceiveBufferSize() + ", " + 233 channel.socket().getSendBufferSize()); 234 235 ByteBuffer buf = ByteBuffer.allocate(6); 236 channel.read(buf); 237 buf.flip(); 238 short sid = buf.getShort(); 239 int boot = buf.getInt(); 240 241 CnxHandler cnx = getHandler(sid); 242 if (cnx.remoteStart(channel, boot)) cnx.startEnd(); 243 } 244 245 final class Dispatcher extends Daemon { 246 Dispatcher(String name, Logger logmon) { 247 super(name + ".dispatcher"); 248 this.logmon = logmon; 250 } 251 252 protected void close() {} 253 254 protected void shutdown() {} 255 256 public void run() { 257 Message msg = null; 258 259 try { 260 while (running) { 261 canStop = true; 262 263 if (this.logmon.isLoggable(BasicLevel.DEBUG)) 264 this.logmon.log(BasicLevel.DEBUG, 265 this.getName() + ", waiting message"); 266 try { 267 msg = qout.get(); 268 } catch (InterruptedException exc) { 269 continue; 270 } 271 canStop = false; 272 if (! running) break; 273 274 try { 275 getHandler(msg.getDest()).send(msg); 277 } catch (IOException exc) { 278 if (this.logmon.isLoggable(BasicLevel.ERROR)) 279 this.logmon.log(BasicLevel.ERROR, this.getName(), exc); 280 } 281 qout.pop(); 282 } 283 } finally { 284 finish(); 285 } 286 } 287 } 288 289 final class NetServer extends Daemon { 290 NetServer(String name, Logger logmon) throws IOException { 291 super(name + ".NetServer"); 292 this.logmon = logmon; 294 295 300 } 303 304 protected void close() { 305 } 310 311 protected void shutdown() { 312 close(); 313 } 314 315 public void run() { 316 int nbop = 0; 317 CnxHandler cnx = null; 318 319 try { 320 321 while (running) { 322 323 canStop = true; 326 try { 327 if (logmon.isLoggable(BasicLevel.DEBUG)) 328 logmon.log(BasicLevel.DEBUG, getName() + ", on select"); 329 nbop = selector.select(WDActivationPeriod); 330 if (logmon.isLoggable(BasicLevel.DEBUG)) 331 logmon.log(BasicLevel.DEBUG, getName() + ", on select:" + nbop); 332 } catch (IOException exc) { 333 } 334 335 for (int idx=0; idx<handlers.length; idx++) { 336 if (logmon.isLoggable(BasicLevel.DEBUG)) 337 logmon.log(BasicLevel.DEBUG, getName() + ", " + handlers[idx]); 338 339 if ((handlers[idx] != null) && 340 (handlers[idx].sendlist.size() > 0) && 341 (handlers[idx].channel == null)) { 342 try { 343 handlers[idx].start(); 344 } catch (IOException exc) { 345 this.logmon.log(BasicLevel.WARN, 346 this.getName() + ", can't start cnx#" + idx, 347 exc); 348 } 349 } 350 } 351 352 if (nbop == 0) continue; 353 canStop = false; 354 355 Set keys = selector.selectedKeys(); 358 for(Iterator it = keys.iterator(); it.hasNext(); ) { 359 if (! running) break; 360 361 SelectionKey key = (SelectionKey) it.next(); 363 it.remove(); 365 366 if (logmon.isLoggable(BasicLevel.DEBUG)) 367 logmon.log(BasicLevel.DEBUG, 368 getName() + "(1): " + key + " -> " + key.interestOps()); 369 370 logmon.log(BasicLevel.DEBUG, 371 getName() + ":" + 372 key.isValid() + 373 key.isAcceptable() + 374 key.isReadable() + 375 key.isWritable()); 376 try { 377 if (key.isAcceptable()) { 379 if (logmon.isLoggable(BasicLevel.DEBUG)) 380 logmon.log(BasicLevel.DEBUG, getName() + " acceptable"); 381 382 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 384 385 SocketChannel channel = server.accept(); 387 388 cnxStart(channel); 390 } else { 391 cnx = (CnxHandler) key.attachment(); 392 393 if (logmon.isLoggable(BasicLevel.DEBUG)) 394 logmon.log(BasicLevel.DEBUG, 395 getName() + ": " + key + " -> " + cnx); 396 if (key.isValid() && key.isReadable()) { 399 if (logmon.isLoggable(BasicLevel.DEBUG)) 400 logmon.log(BasicLevel.DEBUG, getName() + " readable"); 401 cnx.read(); 402 } 403 if (key.isValid() && key.isWritable()) { 404 if (logmon.isLoggable(BasicLevel.DEBUG)) 405 logmon.log(BasicLevel.DEBUG, getName() + " writable"); 406 cnx.write(); 407 } else if (cnx.sendlist.size() > 0) { 408 logmon.log(BasicLevel.FATAL, getName() + " force"); 409 key.interestOps(key.channel().validOps()); 410 } 411 } 412 if (logmon.isLoggable(BasicLevel.DEBUG)) 413 logmon.log(BasicLevel.DEBUG, getName() + "(2): " + 414 key + " -> " + key.interestOps()); 415 416 } catch (Exception exc) { 417 logmon.log(BasicLevel.ERROR, getName(), exc); 418 try { 420 cnx.close(); 421 } catch (IOException exc2) { 422 logmon.log(BasicLevel.ERROR, getName(), exc2); 423 } 424 } 425 } 426 } 427 } catch (Throwable exc) { 428 logmon.log(BasicLevel.FATAL, getName(), exc); 429 } 430 } 431 } 432 433 class CnxHandler { 434 435 private short sid; 436 437 private String name = null; 438 445 private boolean local = false; 446 447 private ServerDesc server; 448 449 SocketChannel channel = null; 450 451 452 long lasttry = 0L; 453 454 455 int nbwrite = 0; 456 MessageOutputStream mos = null; 457 ByteBuffer bufout = null; 458 459 460 MessageVector sendlist = null; 461 462 463 ByteBuffer bufin = null; 464 MessageInputStream mis = null; 465 466 CnxHandler(String name, short sid) throws IOException { 467 this.sid = sid; 468 this.name = name + ".cnxHandler#" + sid; 469 470 if (logmon.isLoggable(BasicLevel.DEBUG)) 471 logmon.log(BasicLevel.DEBUG, getName() + ", created"); 472 473 mos = new MessageOutputStream(); 474 bufin = ByteBuffer.allocateDirect(SO_BUFSIZE); 475 mis = new MessageInputStream(); 476 477 sendlist = new MessageVector(); 478 } 479 480 void init() throws IOException, UnknownServerException { 481 server = AgentServer.getServerDesc(sid); 482 if (sendlist.size() > 0) start(); 483 } 484 485 490 public final String getName() { 491 return name; 492 } 493 494 void start() throws IOException { 495 if (logmon.isLoggable(BasicLevel.DEBUG)) 496 logmon.log(BasicLevel.DEBUG, getName() + ", try to start"); 497 498 long currentTimeMillis = System.currentTimeMillis(); 499 500 if (server == null) 501 return; 503 504 if (((server.retry < WDNbRetryLevel1) && 505 ((server.last + WDRetryPeriod1) < currentTimeMillis)) || 506 ((server.retry < WDNbRetryLevel2) && 507 ((server.last + WDRetryPeriod2) < currentTimeMillis)) || 508 ((server.last + WDRetryPeriod3) < currentTimeMillis)) { 509 if (localStart()) { 510 startEnd(); 511 } else { 512 server.last = currentTimeMillis; 513 server.retry += 1; 514 } 515 } 516 } 517 518 538 boolean localStart() { 539 synchronized (this) { 540 if ((this.channel != null) || this.local) { 541 if (logmon.isLoggable(BasicLevel.WARN)) 545 logmon.log(BasicLevel.WARN, getName() + ", connection refused"); 546 return false; 547 } 548 549 this.local = true; 551 } 552 553 SocketChannel channel = null; 554 try { 555 SocketAddress addr = new InetSocketAddress(server.getAddr(), 556 server.getPort()); 557 channel = SocketChannel.open(addr); 558 559 channel.socket().setSendBufferSize(SO_BUFSIZE); 560 channel.socket().setReceiveBufferSize(SO_BUFSIZE); 561 if (logmon.isLoggable(BasicLevel.DEBUG)) 562 logmon.log(BasicLevel.DEBUG, getName() + " bufsize: " + 563 channel.socket().getReceiveBufferSize() + ", " + 564 channel.socket().getSendBufferSize()); 565 566 if (logmon.isLoggable(BasicLevel.DEBUG)) 567 logmon.log(BasicLevel.DEBUG, 568 getName() + ", writeBoot: " + getBootTS()); 569 570 ByteBuffer buf = ByteBuffer.allocate(6); 571 buf.putShort(AgentServer.getServerId()); 572 buf.putInt(getBootTS()); 573 buf.flip(); 574 channel.write(buf); 575 576 buf.flip(); 578 if (channel.read(buf) > 0) { 579 buf.flip(); 581 int boot = buf.getInt(); 582 583 AgentServer.getTransaction().begin(); 584 testBootTS(sid, boot); 585 AgentServer.getTransaction().commit(); 586 AgentServer.getTransaction().release(); 587 } else { 588 throw new ConnectException("Can't get status"); 589 } 590 } catch (Exception exc) { 591 if (logmon.isLoggable(BasicLevel.WARN)) 592 logmon.log(BasicLevel.WARN, 593 getName() + ", connection refused.", exc); 594 try { 596 channel.close(); 597 } catch (Exception exc2) {} 598 599 this.local = false; 601 return false; 602 } 603 604 this.channel = channel; 610 this.local = false; 611 612 return true; 613 } 614 615 628 synchronized boolean remoteStart(SocketChannel channel, int boot) { 629 try { 630 if ((this.channel != null) || 631 (this.local && server.sid > AgentServer.getServerId())) { 632 throw new ConnectException("Already connected"); 639 } 640 641 if (logmon.isLoggable(BasicLevel.DEBUG)) 643 logmon.log(BasicLevel.DEBUG, 644 getName() + ", writeBoot: " + getBootTS()); 645 646 ByteBuffer buf = ByteBuffer.allocate(4); 647 buf.putInt(getBootTS()); 648 buf.flip(); 649 channel.write(buf); 650 651 AgentServer.getTransaction().begin(); 652 testBootTS(sid, boot); 653 AgentServer.getTransaction().commit(); 654 AgentServer.getTransaction().release(); 655 656 this.channel = channel; 658 return true; 659 } catch (Exception exc) { 660 if (logmon.isLoggable(BasicLevel.WARN)) 662 logmon.log(BasicLevel.WARN, 663 getName() + ", connection refused", exc); 664 665 try { 667 channel.close(); 668 } catch (Exception exc2) {} 669 } 670 return false; 671 } 672 673 678 private void startEnd() throws IOException { 679 server.active = true; 680 server.retry = 0; 681 682 nbwrite = 0; 684 bufin.clear(); 686 688 channel.configureBlocking(false); 690 channel.register(selector, channel.validOps(), this); 692 693 if (logmon.isLoggable(BasicLevel.DEBUG)) 694 logmon.log(BasicLevel.DEBUG, 695 getName() + ", connection started"); 696 697 sendlist.reset(); 698 } 699 700 synchronized void send(Message msg) throws IOException { 701 if (logmon.isLoggable(BasicLevel.DEBUG)) 702 logmon.log(BasicLevel.DEBUG, 703 getName() + ", send message: " + msg); 704 705 sendlist.addMessage(msg); 708 709 if ((channel != null) && (bufout == null)) { 710 714 717 SelectionKey key = channel.keyFor(selector); 718 if (logmon.isLoggable(BasicLevel.DEBUG)) 719 logmon.log(BasicLevel.DEBUG, 720 getName() + ", send message, key=" + key); 721 if (key != null) 722 key.interestOps(channel.validOps()); 723 } 724 725 if (selector == null) { 727 logmon.log(BasicLevel.WARN, 728 getName() + ", network not started."); 729 } else { 730 selector.wakeup(); 731 } 732 } 733 734 737 final class MessageOutputStream extends ByteArrayOutputStream { 738 private ObjectOutputStream oos = null; 739 740 MessageOutputStream() throws IOException { 741 super(256); 742 743 oos = new ObjectOutputStream(this); 744 745 count = 0; 746 buf[29] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF); 747 buf[30] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF); 748 buf[31] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF); 749 buf[32] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF); 750 } 751 752 void writeMessage(Message msg) throws IOException { 753 logmon.log(BasicLevel.DEBUG, getName() + ", writes " + msg); 754 755 buf[4] = (byte) (msg.from.from >>> 8); 757 buf[5] = (byte) (msg.from.from >>> 0); 758 buf[6] = (byte) (msg.from.to >>> 8); 759 buf[7] = (byte) (msg.from.to >>> 0); 760 buf[8] = (byte) (msg.from.stamp >>> 24); 761 buf[9] = (byte) (msg.from.stamp >>> 16); 762 buf[10] = (byte) (msg.from.stamp >>> 8); 763 buf[11] = (byte) (msg.from.stamp >>> 0); 764 buf[12] = (byte) (msg.to.from >>> 8); 766 buf[13] = (byte) (msg.to.from >>> 0); 767 buf[14] = (byte) (msg.to.to >>> 8); 768 buf[15] = (byte) (msg.to.to >>> 0); 769 buf[16] = (byte) (msg.to.stamp >>> 24); 770 buf[17] = (byte) (msg.to.stamp >>> 16); 771 buf[18] = (byte) (msg.to.stamp >>> 8); 772 buf[19] = (byte) (msg.to.stamp >>> 0); 773 buf[20] = (byte) (msg.source >>> 8); 775 buf[21] = (byte) (msg.source >>> 0); 776 buf[22] = (byte) (msg.dest >>> 8); 778 buf[23] = (byte) (msg.dest >>> 0); 779 buf[24] = (byte) (msg.stamp >>> 24); 781 buf[25] = (byte) (msg.stamp >>> 16); 782 buf[26] = (byte) (msg.stamp >>> 8); 783 buf[27] = (byte) (msg.stamp >>> 0); 784 count = 28; 785 786 if (msg.not != null) { 787 buf[28] = (byte) ((msg.not.persistent?0x01:0) | 789 (msg.not.detachable?0x10:0)); 790 count = 33; 792 793 oos.writeObject(msg.not); 794 oos.reset(); 795 oos.flush(); 796 } 797 798 buf[0] = (byte) (count >>> 24); 800 buf[1] = (byte) (count >>> 16); 801 buf[2] = (byte) (count >>> 8); 802 buf[3] = (byte) (count >>> 0); 803 804 logmon.log(BasicLevel.DEBUG, getName() + ", writes " + count); 805 806 nbwrite = count; 807 bufout = ByteBuffer.wrap(buf, 0, count); 808 nbwrite -= channel.write(bufout); 809 } 810 } 811 812 816 819 private synchronized void write() throws IOException { 820 if (logmon.isLoggable(BasicLevel.DEBUG)) 821 logmon.log(BasicLevel.DEBUG, getName() + " write-1"); 822 823 if ((bufout != null) && (nbwrite > 0)) { 825 if (logmon.isLoggable(BasicLevel.DEBUG)) 826 logmon.log(BasicLevel.DEBUG, getName() + " write-2"); 827 nbwrite -= channel.write(bufout); 828 } else { 829 if (nbwrite == 0) { 830 if (logmon.isLoggable(BasicLevel.DEBUG)) 831 logmon.log(BasicLevel.DEBUG, getName() + " write-3"); 832 Message msg = sendlist.nextMessage(); 841 if (msg == null) { 842 bufout = null; 843 if (logmon.isLoggable(BasicLevel.DEBUG)) 846 logmon.log(BasicLevel.DEBUG, getName() + " write-4x:" + msg); 847 channel.register(selector, SelectionKey.OP_READ, this); 848 } else { 849 if (logmon.isLoggable(BasicLevel.DEBUG)) 850 logmon.log(BasicLevel.DEBUG, getName() + " write-4:" + msg); 851 mos.writeMessage(msg); 852 if (msg.not == null) { 853 logmon.log(BasicLevel.DEBUG, getName() + " remove ack sent"); 854 sendlist.removeCurrent(); 855 } 856 } 857 } 858 } 859 } 860 861 864 private synchronized void read() throws Exception { 865 int bytes = channel.read(bufin); 866 867 if (logmon.isLoggable(BasicLevel.DEBUG)) 868 logmon.log(BasicLevel.DEBUG, getName() + " reads: " + bytes); 869 870 if (bytes == 0) return; 871 872 if (bytes < 0) { 873 if (logmon.isLoggable(BasicLevel.DEBUG)) 874 logmon.log(BasicLevel.DEBUG, getName() + " cnx remotely closed"); 875 close(); 876 return; 877 } 878 879 bufin.flip(); 880 while (bytes > 0) { 881 890 if (mis.length == -1) { 891 if ((mis.getCount() + bytes) < 28) { 894 bufin.get(mis.getBuffer(), mis.getCount(), bytes); 895 mis.setCount(mis.getCount() + bytes); 896 bytes = 0; 897 } else { 898 bufin.get(mis.getBuffer(), mis.getCount(), 28-mis.getCount()); 899 bytes -= 28-mis.getCount(); 900 mis.setCount(28); 901 902 Message msg = mis.readHeader(); 903 904 if (mis.length == 28) { 905 if (logmon.isLoggable(BasicLevel.DEBUG)) 906 logmon.log(BasicLevel.DEBUG, 907 getName() + ", ack received #" + msg.stamp); 908 doAck(msg.stamp); 910 msg.free(); 911 912 mis.length = -1; 914 mis.msg = null; 915 mis.setCount(0); 916 } 917 918 } 932 } else { 933 if ((mis.getCount() + bytes) < (mis.length-28)) { 935 bufin.get(mis.getBuffer(), mis.getCount(), bytes); 936 mis.setCount(mis.getCount() + bytes); 937 bytes = 0; 938 } else { 939 bufin.get(mis.getBuffer(), mis.getCount(), mis.length-28-mis.getCount()); 940 bytes -= mis.length-28-mis.getCount(); 941 mis.setCount(mis.length-28); 942 943 Message msg = mis.readMessage(); 944 945 int stamp = msg.getStamp(); 948 if (logmon.isLoggable(BasicLevel.DEBUG)) 949 logmon.log(BasicLevel.DEBUG, 950 getName() + ", message received #" + stamp); 951 deliver(msg); 953 ack(stamp); 954 955 mis.length = -1; 957 mis.msg = null; 958 mis.setCount(0); 959 } 960 } 961 } 962 bufin.clear(); 963 } 964 965 968 final class MessageInputStream extends ByteArrayInputStream { 969 int length = -1; 970 Message msg = null; 971 972 MessageInputStream() { 973 super(new byte[512]); 974 count = 0; 975 } 976 977 public void reset() { 978 super.reset(); 979 length = -1; 980 msg = null; 981 } 982 983 byte[] getBuffer() { 984 return buf; 985 } 986 987 int getCount() { 988 return count; 989 } 990 991 void setCount(int count) { 992 this.count = count; 993 } 994 995 Message readHeader() throws Exception { 996 length = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) + 998 ((buf[2] & 0xFF) << 8) + ((buf[3] & 0xFF) << 0); 999 1000 msg = Message.alloc(); 1001 msg.from = new AgentId( 1003 (short) (((buf[4] & 0xFF) << 8) + (buf[5] & 0xFF)), 1004 (short) (((buf[6] & 0xFF) << 8) + (buf[7] & 0xFF)), 1005 ((buf[8] & 0xFF) << 24) + ((buf[9] & 0xFF) << 16) + 1006 ((buf[10] & 0xFF) << 8) + ((buf[11] & 0xFF) << 0)); 1007 msg.to = new AgentId( 1009 (short) (((buf[12] & 0xFF) << 8) + (buf[13] & 0xFF)), 1010 (short) (((buf[14] & 0xFF) << 8) + (buf[15] & 0xFF)), 1011 ((buf[16] & 0xFF) << 24) + ((buf[17] & 0xFF) << 16) + 1012 ((buf[18] & 0xFF) << 8) + ((buf[19] & 0xFF) << 0)); 1013 msg.source = (short) (((buf[20] & 0xFF) << 8) + 1015 ((buf[21] & 0xFF) << 0)); 1016 msg.dest = (short) (((buf[22] & 0xFF) << 8) + 1018 ((buf[23] & 0xFF) << 0)); 1019 msg.stamp = ((buf[24] & 0xFF) << 24) + 1021 ((buf[25] & 0xFF) << 16) + 1022 ((buf[26] & 0xFF) << 8) + 1023 ((buf[27] & 0xFF) << 0); 1024 1025 if ((length -28) > buf.length) 1026 buf = new byte[length -28]; 1027 1028 count = 0; 1029 1030 return msg; 1031 } 1032 1033 Message readMessage() throws Exception { 1034 if (length > 28) { 1035 boolean persistent = ((buf[28] & 0x01) == 0x01) ? true : false; 1037 boolean detachable = ((buf[28] & 0x10) == 0x10) ? true : false; 1038 pos = 1; 1039 ObjectInputStream ois = new ObjectInputStream(this); 1041 msg.not = (Notification) ois.readObject(); 1042 msg.not.persistent = persistent; 1043 msg.not.detachable = detachable; 1044 msg.not.detached = false; 1045 } else { 1046 msg.not = null; 1047 } 1048 1049 return msg; 1050 } 1051 } 1052 1053 1060 final private void doAck(int ack) throws IOException { 1061 Message msg = null; 1062 try { 1063 msg = sendlist.removeMessage(ack); 1066 AgentServer.getTransaction().begin(); 1067 msg.delete(); 1068 msg.free(); 1069 AgentServer.getTransaction().commit(); 1070 AgentServer.getTransaction().release(); 1071 1072 if (logmon.isLoggable(BasicLevel.DEBUG)) 1073 logmon.log(BasicLevel.DEBUG, 1074 getName() + ", remove msg#" + msg.getStamp()); 1075 } catch (NoSuchElementException exc) { 1076 logmon.log(BasicLevel.WARN, 1077 getName() + ", can't ack, unknown msg#" + ack); 1078 } 1079 } 1080 1081 final private void ack(int stamp) throws Exception { 1082 if (logmon.isLoggable(BasicLevel.DEBUG)) 1083 logmon.log(BasicLevel.DEBUG, 1084 getName() + ", set ack msg#" + stamp); 1085 1086 Message ack = Message.alloc(AgentId.localId, 1087 AgentId.localId(server.sid), 1088 null); 1089 ack.source = AgentServer.getServerId(); 1090 ack.dest = AgentServer.getServerDesc(server.sid).gateway; 1091 ack.stamp = stamp; 1092 1093 send(ack); 1094 } 1095 1096 void close() throws IOException { 1097 if (logmon.isLoggable(BasicLevel.DEBUG)) 1098 logmon.log(BasicLevel.DEBUG, getName() + ", close"); 1099 try { 1100 channel.keyFor(selector).cancel(); 1101 } catch (Exception exc) {} 1102 try { 1103 channel.close(); 1104 } catch (Exception exc) { 1105 } finally { 1107 channel = null; 1108 } 1109 nbwrite = 0; 1110 bufout = null; 1111 } 1112 1113 public String toString() { 1114 StringBuffer strbuf = new StringBuffer (); 1115 strbuf.append('(').append(super.toString()); 1116 strbuf.append(',').append(name); 1117 strbuf.append(',').append(channel); 1118 strbuf.append(',').append(nbwrite); 1119 strbuf.append(',').append(sendlist).append(')'); 1120 return strbuf.toString(); 1121 } 1122 } 1123 1124 final class MessageVector { 1125 1132 private Message elementData[] = null; 1133 1134 1137 private int elementCount = 0; 1138 1139 1142 private int current = -1; 1143 1144 1148 public MessageVector() { 1149 this.elementData = new Message[20]; 1150 } 1151 1152 public synchronized Message nextMessage() { 1153 logmon.log(BasicLevel.FATAL, getName() + ", nextMessage:" + toString()); 1154 1155 if ((current +1) < elementCount) 1156 return elementData[++current]; 1157 else 1158 return null; 1159 } 1160 1161 1166 public synchronized int size() { 1167 return elementCount; 1168 } 1169 1170 public synchronized void reset() { 1171 current = -1; 1172 } 1173 1174 1181 public synchronized void addMessage(Message msg) { 1182 logmon.log(BasicLevel.FATAL, getName() + ", addMessage:" + toString()); 1183 1184 if ((elementCount + 1) > elementData.length) { 1185 Message oldData[] = elementData; 1186 elementData = new Message[elementData.length * 2]; 1187 System.arraycopy(oldData, 0, elementData, 0, elementCount); 1188 } 1189 elementData[elementCount++] = msg; 1190 } 1191 1192 public synchronized void removeCurrent() { 1193 logmon.log(BasicLevel.FATAL, getName() + ", removeCurrent:" + toString()); 1194 1195 if (elementCount > (current +1)) { 1196 System.arraycopy(elementData, current +1, 1197 elementData, current, elementCount - current -1); 1198 1199 } 1200 elementData[elementCount-1] = null; 1201 elementCount--; 1202 current--; 1203 } 1204 1205 1211 public synchronized Message removeMessage(int stamp) { 1212 Message msg = null; 1213 1214 logmon.log(BasicLevel.FATAL, getName() + ", removeMessage:" + toString()); 1215 1216 for (int index=0 ; index<elementCount ; index++) { 1217 msg = elementData[index]; 1218 1219 if ((msg.not != null) && (msg.getStamp() == stamp)) { 1220 if (elementCount > (index +1)) { 1221 System.arraycopy(elementData, index +1, 1222 elementData, index, elementCount - index - 1); 1223 } 1224 elementData[elementCount-1] = null; 1225 elementCount--; 1226 if (index <=current) current--; 1228 1229 return msg; 1230 } 1231 } 1232 throw new NoSuchElementException(); 1233 } 1234 1235 public String toString() { 1236 StringBuffer strbuf = new StringBuffer (); 1237 strbuf.append(super.toString()); 1238 strbuf.append(',').append(current); 1239 strbuf.append(',').append(elementCount); 1240 for (int i=0; i<elementCount; i++) { 1241 strbuf.append(",(").append(elementData[i]).append(')'); 1242 } 1243 return strbuf.toString(); 1244 } 1245 } 1246} 1247 | Popular Tags |