1 22 package fr.dyade.aaa.agent; 23 24 import java.io.*; 25 import java.net.*; 26 import java.util.Vector ; 27 28 import org.objectweb.util.monolog.api.BasicLevel; 29 import org.objectweb.util.monolog.api.Logger; 30 31 import fr.dyade.aaa.util.*; 32 33 37 public class HttpNetwork extends StreamNetwork implements HttpNetworkMBean { 38 private InetAddress proxy = null; 39 50 String proxyhost = null; 51 61 int proxyport = 0; 62 63 76 protected long activationPeriod = 10000L; 77 78 83 public long getActivationPeriod() { 84 return activationPeriod; 85 } 86 87 92 public void setActivationPeriod(long activationPeriod) { 93 this.activationPeriod = activationPeriod; 94 } 95 96 106 int NbDaemon = 1; 107 108 113 public long getNbDaemon() { 114 return NbDaemon; 115 } 116 117 120 public HttpNetwork() { 121 super(); 122 } 123 124 128 ServerDesc server = null; 129 130 143 public void init(String name, int port, short[] servers) throws Exception { 144 super.init(name, port, servers); 145 146 activationPeriod = Long.getLong("ActivationPeriod", 147 activationPeriod).longValue(); 148 activationPeriod = Long.getLong(name + ".ActivationPeriod", 149 activationPeriod).longValue(); 150 151 NbDaemon = Integer.getInteger("NbDaemon", NbDaemon).intValue(); 152 NbDaemon = Integer.getInteger(name + ".NbDaemon", NbDaemon).intValue(); 153 154 proxyhost = System.getProperty("proxyhost"); 155 proxyhost = System.getProperty(name + ".proxyhost", proxyhost); 156 if (proxyhost != null) { 157 proxyport = Integer.getInteger("proxyport", 8080).intValue(); 158 proxyport = Integer.getInteger(name + ".proxyport", proxyport).intValue(); 159 proxy = InetAddress.getByName(proxyhost); 160 } 161 } 162 163 164 Daemon dmon[] = null; 165 166 169 public void start() throws Exception { 170 logmon.log(BasicLevel.DEBUG, getName() + ", starting"); 171 try { 172 if (isRunning()) return; 173 174 for (int i=0; i<servers.length; i++) { 176 server = AgentServer.getServerDesc(servers[i]); 177 if ((server.getServerId() != AgentServer.getServerId()) && 178 (server.getPort() > 0)) { 179 logmon.log(BasicLevel.DEBUG, getName() + ", server=" + server); 180 break; 181 } 182 server = null; 183 } 184 185 if (port != 0) { 186 dmon = new Daemon[NbDaemon]; 187 ServerSocket listen = createServerSocket(); 188 189 for (int i=0; i<NbDaemon; i++) { 190 dmon[i] = new NetServerIn(getName() + '.' + i, listen, logmon); 191 } 192 } else { 193 dmon = new Daemon[1]; 194 dmon[0] = new NetServerOut(getName(), logmon); 195 } 196 197 for (int i=0; i<dmon.length; i++) { 198 dmon[i].start(); 199 } 200 } catch (IOException exc) { 201 logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc); 202 throw exc; 203 } 204 logmon.log(BasicLevel.DEBUG, getName() + ", started"); 205 } 206 207 210 public void wakeup() { 211 } 213 214 217 public void stop() { 218 if (dmon != null) { 219 for (int i=0; i<dmon.length; i++) { 220 dmon[i].stop(); 221 } 222 } 223 logmon.log(BasicLevel.DEBUG, getName() + ", stopped"); 224 } 225 226 232 public boolean isRunning() { 233 if (dmon != null) { 234 for (int i=0; i<dmon.length; i++) { 235 if (dmon[i].isRunning()) return true; 236 } 237 } 238 return false; 239 } 240 241 247 public String toString() { 248 StringBuffer strbuf = new StringBuffer (); 249 250 strbuf.append(super.toString()).append("\n\t"); 251 if (dmon != null) { 252 for (int i=0; i<dmon.length; i++) { 253 strbuf.append(dmon[i].toString()).append("\n\t"); 254 } 255 } 256 return strbuf.toString(); 257 } 258 259 protected String readLine(InputStream is, byte[] buf) throws IOException { 260 int i = 0; 261 while ((buf[i++] = (byte) is.read()) != -1) { 262 if ((buf[i-1] == '\n') && (buf[i-2] == '\r')) { 263 i -= 2; 264 break; 265 } 266 } 267 268 if (i > 0) return new String (buf, 0, i); 269 270 return null; 271 } 272 273 protected void sendRequest(Message msg, 274 OutputStream os, 275 MessageOutputStream nos, 276 int ack, 277 long currentTimeMillis) throws Exception { 278 StringBuffer strbuf = new StringBuffer (); 279 280 strbuf.append("PUT "); 281 if (proxy != null) { 282 strbuf.append("http://").append(server.getHostname()).append(':').append(server.getPort()); 283 } 284 strbuf.append("/msg?from=").append(AgentServer.getServerId()); 285 strbuf.append("&stamp="); 286 if (msg != null) { 287 strbuf.append(msg.getStamp()); 288 } else { 289 strbuf.append("-1"); 290 } 291 strbuf.append(" HTTP/1.1"); 292 nos.writeMessage(msg, ack, currentTimeMillis); 293 294 if (proxy != null) 295 strbuf.append("\r\nHost: ").append(server.getHostname()); 296 strbuf.append("\r\n" + 297 "User-Agent: ScalAgent 1.0\r\n" + 298 "Accept: image/jpeg;q=0.2\r\n" + 299 "Accept-Language: fr, en-us;q=0.50\r\n" + 300 "Accept-Encoding: gzip;q=0.9\r\n" + 301 "Accept-Charset: ISO-8859-1, utf-8;q=0.66\r\n" + 302 "Cache-Control: no-cache\r\n" + 303 "Cache-Control: no-store\r\n" + 304 "Keep-Alive: 300\r\n" + 305 "Connection: keep-alive\r\n" + 306 "Proxy-Connection: keep-alive\r\n" + 307 "Pragma: no-cache\r\n"); 308 strbuf.append("Content-Length: ").append(nos.size()); 309 strbuf.append("\r\n" + 310 "Content-Type: image/jpeg\r\n"); 311 strbuf.append("\r\n"); 312 313 os.write(strbuf.toString().getBytes()); 314 315 if (logmon.isLoggable(BasicLevel.DEBUG)) 316 logmon.log(BasicLevel.DEBUG, name + ", writes:" + nos.size()); 317 nos.writeTo(os); 318 nos.reset(); 319 320 os.flush(); 321 } 322 323 protected short getRequest(InputStream is, 324 MessageInputStream nis, 325 byte[] buf) throws Exception { 326 String line = null; 327 328 line = readLine(is, buf); 329 if ((line == null) || 330 (! (line.startsWith("GET ") || line.startsWith("PUT ")))) { 331 throw new Exception ("Bad request: " + line); 332 } 333 334 int idx1 = line.indexOf("?from="); 335 if (idx1 == -1) throw new Exception ("Bad request: " + line); 336 int idx2 = line.indexOf("&", idx1); 337 if (idx2 == -1) throw new Exception ("Bad request: " + line); 338 short from = Short.parseShort(line.substring(idx1+6, idx2)); 339 340 int length = 0; 342 while (line != null) { 343 line = readLine(is, buf); 344 if ((line != null) && line.startsWith("Content-Length: ")) { 345 length = Integer.parseInt(line.substring(16)); 347 if (logmon.isLoggable(BasicLevel.DEBUG)) 348 logmon.log(BasicLevel.DEBUG, name + ", length:" + length); 349 } 350 } 351 352 if (nis.readFrom(is) != length) 353 logmon.log(BasicLevel.WARN, name + "Bad request length: " + length); 354 355 return from; 356 } 357 358 protected void sendReply(Message msg, 359 OutputStream os, 360 MessageOutputStream nos, 361 int ack, 362 long currentTimeMillis) throws Exception { 363 StringBuffer strbuf = new StringBuffer (); 364 365 strbuf.append("HTTP/1.1 200 OK\r\n"); 366 367 nos.writeMessage(msg, ack, currentTimeMillis); 368 369 strbuf.append("Date: ").append("Fri, 21 Feb 2003 14:30:51 GMT"); 370 strbuf.append("\r\n" + 371 "Server: ScalAgent 1.0\r\n" + 372 "Last-Modified: ").append("Wed, 19 Apr 2000 08:16:28 GMT"); 373 strbuf.append("\r\n" + 374 "Cache-Control: no-cache\r\n" + 375 "Cache-Control: no-store\r\n" + 376 "Accept-Ranges: bytes\r\n" + 377 "Keep-Alive: timeout=15, max=100\r\n" + 378 "Connection: Keep-Alive\r\n" + 379 "Proxy-Connection: Keep-Alive\r\n" + 380 "Pragma: no-cache\r\n"); 381 strbuf.append("Content-Length: ").append(nos.size()); 382 strbuf.append("\r\n" + 383 "Content-Type: image/gif\r\n"); 384 strbuf.append("\r\n"); 385 386 os.write(strbuf.toString().getBytes()); 387 388 if (logmon.isLoggable(BasicLevel.DEBUG)) 389 logmon.log(BasicLevel.DEBUG, name + ", writes:" + nos.size()); 390 nos.writeTo(os); 391 nos.reset(); 392 393 os.flush(); 394 } 395 396 protected void getReply(InputStream is, 397 MessageInputStream nis, 398 byte[] buf) throws Exception { 399 String line = null; 400 401 line = readLine(is, buf); 402 if ((line == null) || 403 ((! line.equals("HTTP/1.1 200 OK")) && 404 (! line.equals("HTTP/1.1 204 No Content")))) { 405 throw new Exception ("Bad reply: " + line); 406 } 407 408 int length = 0; 410 while (line != null) { 411 line = readLine(is, buf); 412 if ((line != null) && line.startsWith("Content-Length: ")) { 413 length = Integer.parseInt(line.substring(16)); 415 if (logmon.isLoggable(BasicLevel.DEBUG)) 416 logmon.log(BasicLevel.DEBUG, name + ", length:" + length); 417 } 418 } 419 420 if (nis.readFrom(is) != length) 421 logmon.log(BasicLevel.WARN, name + "Bad reply length: " + length); 422 } 423 424 protected int handle(Message msgout, 425 MessageInputStream nis) throws Exception { 426 int ack = nis.getAckStamp(); 427 428 if (logmon.isLoggable(BasicLevel.DEBUG)) 429 logmon.log(BasicLevel.DEBUG, 430 this.getName() + ", handle: " + msgout + ", ack=" + ack); 431 432 if ((msgout != null) && (msgout.stamp == ack)) { 433 AgentServer.getTransaction().begin(); 434 qout.removeMessage(msgout); 437 msgout.delete(); 438 msgout.free(); 439 AgentServer.getTransaction().commit(); 440 AgentServer.getTransaction().release(); 441 } 442 443 Message msg = nis.getMessage(); 444 if (logmon.isLoggable(BasicLevel.DEBUG)) 445 logmon.log(BasicLevel.DEBUG, 446 this.getName() + ", get: " + msg); 447 448 if (msg != null) { 449 ack = msg.stamp; 450 testBootTS(msg.getSource(), nis.getBootTS()); 451 deliver(msg); 452 return ack; 453 } 454 455 return -1; 456 } 457 458 470 Socket createTunnelSocket(InetAddress host, int port, 471 InetAddress proxy, int proxyport) throws IOException { 472 return createSocket(proxy, proxyport); 473 } 474 475 final class NetServerOut extends Daemon { 476 Socket socket = null; 477 478 InputStream is = null; 479 OutputStream os = null; 480 481 MessageInputStream nis = null; 482 MessageOutputStream nos = null; 483 484 NetServerOut(String name, Logger logmon) throws IOException { 485 super(name + ".NetServerOut"); 486 this.logmon = logmon; 488 489 nis = new MessageInputStream(); 490 nos = new MessageOutputStream(); 491 } 492 493 protected void open() throws IOException { 494 socket = null; 496 497 if (proxy == null) { 498 socket = createSocket(server); 499 } else { 500 try { 501 socket = createTunnelSocket(server.getAddr(), server.getPort(), 502 proxy, proxyport); 503 } catch (IOException exc) { 504 logmon.log(BasicLevel.WARN, 505 this.getName() + ", connection refused, reset addr"); 506 server.resetAddr(); 507 proxy = InetAddress.getByName(proxyhost); 508 socket = createTunnelSocket(server.getAddr(), server.getPort(), 509 proxy, proxyport); 510 } 511 } 512 setSocketOption(socket); 513 514 os = socket.getOutputStream(); 515 is = socket.getInputStream(); 516 } 517 518 protected void close() { 519 if (socket != null) { 520 try { 521 os.close(); 522 } catch (Exception exc) {} 523 try { 524 is.close(); 525 } catch (Exception exc) {} 526 try { 527 socket.close(); 528 } catch (Exception exc) {} 529 } 530 } 531 532 protected void shutdown() { 533 thread.interrupt(); 534 close(); 535 } 536 537 public void run() { 538 Message msgout = null; 539 int ack = -1; 540 541 byte[] buf = new byte[120]; 542 543 try { 544 while (running) { 545 canStop = true; 546 try { 547 try { 548 if (logmon.isLoggable(BasicLevel.DEBUG)) 549 logmon.log(BasicLevel.DEBUG, 550 this.getName() + ", waiting message"); 551 552 msgout = qout.get(activationPeriod); 553 } catch (InterruptedException exc) { 554 if (logmon.isLoggable(BasicLevel.DEBUG)) 555 logmon.log(BasicLevel.DEBUG, 556 this.getName() + ", interrupted"); 557 } 558 open(); 559 560 do { 561 if (logmon.isLoggable(BasicLevel.DEBUG)) 562 logmon.log(BasicLevel.DEBUG, 563 this.getName() + ", sendRequest: " + msgout + ", ack=" + ack); 564 565 if ((msgout != null) &&(msgout.not.expiration != -1)) 566 logmon.log(BasicLevel.FATAL, 567 getName() + ": AF YYY " + msgout.not); 568 569 long currentTimeMillis = System.currentTimeMillis(); 570 do { 571 if ((msgout != null) && 572 (msgout.not.expiration > 0) && 573 (msgout.not.expiration < currentTimeMillis)) { 574 if (logmon.isLoggable(BasicLevel.DEBUG)) 575 logmon.log(BasicLevel.DEBUG, 576 getName() + ": AF removes expired notification XXX " + 577 msgout.from + ", " + msgout.not); 578 qout.removeMessage(msgout); 582 msgout.delete(); 583 msgout.free(); 584 585 msgout = qout.get(0L); 586 continue; 587 } 588 break; 589 } while (true); 590 591 sendRequest(msgout, os, nos, ack, currentTimeMillis); 592 getReply(is, nis, buf); 593 594 canStop = false; 595 ack = handle(msgout, nis); 596 canStop = true; 597 msgout = qout.get(0); 599 } while (running && ((msgout != null) || (ack != -1))); 600 } catch (Exception exc) { 601 if (logmon.isLoggable(BasicLevel.DEBUG)) 602 logmon.log(BasicLevel.DEBUG, 603 this.getName() + ", connection closed", exc); 604 } finally { 605 if (logmon.isLoggable(BasicLevel.DEBUG)) 606 logmon.log(BasicLevel.DEBUG, 607 this.getName() + ", connection ends"); 608 try { 609 os.close(); 610 } catch (Exception exc) {} 611 os = null; 612 try { 613 is.close(); 614 } catch (Exception exc) {} 615 is = null; 616 try { 617 socket.close(); 618 } catch (Exception exc) {} 619 socket = null; 620 } 621 } 622 } finally { 623 logmon.log(BasicLevel.WARN, ", exited"); 624 finish(); 625 } 626 } 627 } 628 629 final class NetServerIn extends Daemon { 630 ServerSocket listen = null; 631 632 Socket socket = null; 633 634 InputStream is = null; 635 OutputStream os = null; 636 637 MessageInputStream nis = null; 638 MessageOutputStream nos = null; 639 640 NetServerIn(String name, ServerSocket listen, Logger logmon) throws IOException { 641 super(name + ".NetServerIn"); 642 this.listen = listen; 643 this.logmon = logmon; 645 646 nis = new MessageInputStream(); 647 nos = new MessageOutputStream(); 648 } 649 650 protected void open(Socket socket) throws IOException { 651 setSocketOption(socket); 652 653 os = socket.getOutputStream(); 654 is = socket.getInputStream(); 655 656 if (logmon.isLoggable(BasicLevel.DEBUG)) 657 logmon.log(BasicLevel.DEBUG, this.getName() + ", connected"); 658 } 659 660 protected void close() { 661 if (socket != null) { 662 try { 663 os.close(); 664 } catch (Exception exc) {} 665 try { 666 is.close(); 667 } catch (Exception exc) {} 668 try { 669 socket.close(); 670 } catch (Exception exc) {} 671 } 672 try { 673 listen.close(); 674 } catch (Exception exc) {} 675 } 676 677 protected void shutdown() { 678 close(); 679 } 680 681 public void run() { 682 Message msgout= null; 683 int ack = -1; 684 685 byte[] buf = new byte[120]; 686 687 try { 688 while (running) { 689 canStop = true; 690 691 try { 693 if (logmon.isLoggable(BasicLevel.DEBUG)) 694 logmon.log(BasicLevel.DEBUG, 695 this.getName() + ", waiting connection"); 696 socket = listen.accept(); 697 open(socket); 698 699 short from = getRequest(is, nis, buf); 700 long currentTimeMillis = System.currentTimeMillis(); 701 do { 702 canStop = false; 703 ack = handle(msgout, nis); 704 canStop = true; 705 706 do { 707 msgout = qout.getMessageTo(from); 708 709 if ((msgout != null) &&(msgout.not.expiration != -1)) 710 logmon.log(BasicLevel.FATAL, 711 getName() + ": AF YYY " + msgout.not); 712 713 if ((msgout != null) && 714 (msgout.not.expiration > 0) && 715 (msgout.not.expiration < currentTimeMillis)) { 716 if (logmon.isLoggable(BasicLevel.DEBUG)) 717 logmon.log(BasicLevel.DEBUG, 718 getName() + ": AF removes expired notification " + 719 msgout.from + ", " + msgout.not); 720 qout.removeMessage(msgout); 724 msgout.delete(); 725 msgout.free(); 726 727 continue; 728 } 729 break; 730 } while (true); 731 732 if (logmon.isLoggable(BasicLevel.DEBUG)) 733 logmon.log(BasicLevel.DEBUG, 734 this.getName() + ", sendReply: " + msgout); 735 736 sendReply(msgout, os, nos, ack, currentTimeMillis); 737 738 logmon.log(BasicLevel.DEBUG, 739 getName() + ": AF WWW " + msgout); 740 741 getRequest(is, nis, buf); 742 } while (running); 743 } catch (Exception exc) { 744 if (logmon.isLoggable(BasicLevel.DEBUG)) 745 logmon.log(BasicLevel.DEBUG, ", connection closed", exc); 746 } finally { 747 if (logmon.isLoggable(BasicLevel.DEBUG)) 748 logmon.log(BasicLevel.DEBUG, ", connection ends"); 749 try { 750 os.close(); 751 } catch (Exception exc) {} 752 os = null; 753 try { 754 is.close(); 755 } catch (Exception exc) {} 756 is = null; 757 try { 758 socket.close(); 759 } catch (Exception exc) {} 760 socket = null; 761 } 762 } 763 } finally { 764 logmon.log(BasicLevel.WARN, ", exited"); 765 finish(); 766 } 767 } 768 } 769 770 773 final class MessageInputStream extends ByteArrayInputStream { 774 MessageInputStream() { 775 super(new byte[256]); 776 } 777 778 private void readFully(InputStream is, int length) throws IOException { 779 count = 0; 780 if (length > buf.length) buf = new byte[length]; 781 782 int nb = -1; 783 do { 784 nb = is.read(buf, count, length-count); 785 if (logmon.isLoggable(BasicLevel.DEBUG)) 786 logmon.log(BasicLevel.DEBUG, getName() + ", reads:" + nb); 787 if (nb < 0) throw new EOFException(); 788 count += nb; 789 } while (count != length); 790 pos = 0; 791 } 792 793 int msgLen; 794 int msgBoot; 795 int msgAck; 796 797 Message msg = null; 798 799 int readFrom(InputStream is) throws Exception { 800 readFully(is, 12); 801 msgLen = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) + 803 ((buf[2] & 0xFF) << 8) + ((buf[3] & 0xFF) << 0); 804 msgBoot = ((buf[4] & 0xFF) << 24) + ((buf[5] & 0xFF) << 16) + 806 ((buf[6] & 0xFF) << 8) + ((buf[7] & 0xFF) << 0); 807 msgAck = ((buf[8] & 0xFF) << 24) + ((buf[9] & 0xFF) << 16) + 808 ((buf[10] & 0xFF) << 8) + ((buf[11] & 0xFF) << 0); 809 810 if (msgLen > (Message.LENGTH +11)) { 811 msg = Message.alloc(); 812 readFully(is, Message.LENGTH); 813 814 int idx = msg.readFromBuf(buf, 0); 815 boolean persistent = ((buf[idx] & Message.PERSISTENT) == 0)?false:true; 817 boolean detachable = ((buf[idx] & Message.DETACHABLE) == 0)?false:true; 818 819 readFully(is, msgLen - (Message.LENGTH +12)); 820 ObjectInputStream ois = new ObjectInputStream(this); 822 msg.not = (Notification) ois.readObject(); 823 if (msg.not.expiration > 0) { 824 msg.not.expiration += System.currentTimeMillis(); 825 } 826 msg.not.persistent = persistent; 827 msg.not.detachable = detachable; 828 msg.not.detached = false; 829 830 return msgLen; 831 } 832 msg = null; 833 return 12; 834 } 835 836 int getLength() { 837 return msgLen; 838 } 839 840 int getBootTS() { 841 return msgBoot; 842 } 843 844 int getAckStamp() { 845 return msgAck; 846 } 847 848 Message getMessage() { 849 return msg; 850 } 851 } 852 853 856 final class MessageOutputStream extends ByteArrayOutputStream { 857 private ObjectOutputStream oos = null; 858 859 MessageOutputStream() throws IOException { 860 super(256); 861 oos = new ObjectOutputStream(this); 862 count = 0; 863 buf[Message.LENGTH +12] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF); 864 buf[Message.LENGTH +13] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF); 865 buf[Message.LENGTH +14] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF); 866 buf[Message.LENGTH +15] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF); 867 } 868 869 void writeMessage(Message msg, int ack, 870 long currentTimeMillis) throws IOException { 871 buf[4] = (byte) (getBootTS() >>> 24); 873 buf[5] = (byte) (getBootTS() >>> 16); 874 buf[6] = (byte) (getBootTS() >>> 8); 875 buf[7] = (byte) (getBootTS() >>> 0); 876 877 buf[8] = (byte) (ack >>> 24); 879 buf[9] = (byte) (ack >>> 16); 880 buf[10] = (byte) (ack >>> 8); 881 buf[11] = (byte) (ack >>> 0); 882 883 count = 12; 884 if (msg != null) { 885 int idx = msg.writeToBuf(buf, 12); 886 buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) | 888 (msg.not.detachable?Message.DETACHABLE:0)); 889 count = (Message.LENGTH + 12 +4); 891 892 try { 893 if (msg.not.expiration > 0) { 894 msg.not.expiration -= currentTimeMillis; 895 } 896 oos.writeObject(msg.not); 897 oos.reset(); 898 oos.flush(); 899 } finally { 900 if ((msg.not != null) && (msg.not.expiration > 0)) { 901 msg.not.expiration += currentTimeMillis; 902 } 903 } 904 } 905 buf[0] = (byte) (count >>> 24); 907 buf[1] = (byte) (count >>> 16); 908 buf[2] = (byte) (count >>> 8); 909 buf[3] = (byte) (count >>> 0); 910 } 911 } 912 } 913 | Popular Tags |