1 16 package org.apache.axis.attachments; 17 18 import org.apache.axis.InternalException; 19 import org.apache.axis.MessageContext; 20 import org.apache.axis.components.logger.LogFactory; 21 import org.apache.axis.utils.Messages; 22 import org.apache.commons.logging.Log; 23 24 import java.io.File ; 25 import java.io.BufferedInputStream ; 26 27 33 public class ManagedMemoryDataSource implements javax.activation.DataSource { 34 35 36 protected static Log log = 37 LogFactory.getLog(ManagedMemoryDataSource.class.getName()); 38 39 43 protected String contentType = "application/octet-stream"; 44 45 46 java.io.InputStream ss = null; 47 48 49 public static final int MIN_MEMORY_DISK_CACHED = -1; 50 51 52 public static final int MAX_MEMORY_DISK_CACHED = 16 * 1024; 53 54 55 protected int maxCached = MAX_MEMORY_DISK_CACHED; 57 59 60 protected java.io.File diskCacheFile = null; 61 62 64 65 protected java.util.WeakHashMap readers = new java.util.WeakHashMap (); 66 67 70 protected boolean deleted = 71 false; 72 73 75 76 public static final int READ_CHUNK_SZ = 32 * 1024; 77 78 79 protected boolean debugEnabled = false; 81 83 86 protected ManagedMemoryDataSource() { 87 } 88 89 99 public ManagedMemoryDataSource( 100 java.io.InputStream ss, int maxCached, String contentType) 101 throws java.io.IOException { 102 this(ss, maxCached, contentType, false); 103 } 104 105 116 public ManagedMemoryDataSource( 117 java.io.InputStream ss, int maxCached, String contentType, boolean readall) 118 throws java.io.IOException { 119 120 if(ss instanceof BufferedInputStream ) { 121 this.ss = ss; 122 } else { 123 this.ss = new BufferedInputStream (ss); 124 } 125 this.maxCached = maxCached; 126 127 if ((null != contentType) && (contentType.length() != 0)) { 128 this.contentType = contentType; 129 } 130 131 if (maxCached < MIN_MEMORY_DISK_CACHED) { 132 throw new IllegalArgumentException ( 133 Messages.getMessage("badMaxCached", "" + maxCached)); 134 } 135 136 if (log.isDebugEnabled()) { 137 debugEnabled = true; } 139 140 if (readall) { 142 byte[] readbuffer = new byte[READ_CHUNK_SZ]; 143 int read = 0; 144 145 do { 146 read = ss.read(readbuffer); 147 148 if (read > 0) { 149 write(readbuffer, read); 150 } 151 } while (read > -1); 152 153 close(); 154 } 155 } 156 157 158 159 163 public java.lang.String getContentType() { 164 return contentType; 165 } 166 167 173 public synchronized java.io.InputStream getInputStream() 174 throws java.io.IOException { 175 176 182 return new Instream(); } 184 185 191 public java.lang.String getName() { 192 193 String ret = null; 194 195 try { 196 flushToDisk(); 197 198 if (diskCacheFile != null) { 199 ret = diskCacheFile.getAbsolutePath(); 200 } 201 } catch (Exception e) { 202 diskCacheFile = null; 203 } 204 205 return ret; 206 } 207 208 218 public java.io.OutputStream getOutputStream() throws java.io.IOException { 219 return null; 220 } 221 222 223 protected java.util.LinkedList memorybuflist = 224 new java.util.LinkedList (); 225 226 227 protected byte[] currentMemoryBuf = null; 228 229 230 protected int currentMemoryBufSz = 231 0; 232 233 234 protected long totalsz = 0; 235 236 237 protected java.io.BufferedOutputStream cachediskstream = 238 null; 239 240 241 protected boolean closed = false; 242 243 249 protected void write(byte[] data) throws java.io.IOException { 250 write(data, data.length); 251 } 252 253 263 protected synchronized void write(byte[] data, int length) 264 throws java.io.IOException { 265 266 if (closed) { 267 throw new java.io.IOException (Messages.getMessage("streamClosed")); 268 } 269 270 int writesz = length; 271 int byteswritten = 0; 272 273 if ((null != memorybuflist) 274 && (totalsz + writesz > maxCached)) { if (null == cachediskstream) { flushToDisk(); 277 } 278 } 279 280 if (memorybuflist != null) { do { 282 if (null == currentMemoryBuf) { 283 currentMemoryBuf = new byte[READ_CHUNK_SZ]; 284 currentMemoryBufSz = 0; 285 286 memorybuflist.add(currentMemoryBuf); 287 } 288 289 int bytes2write = Math.min((writesz - byteswritten), 291 (currentMemoryBuf.length 292 - currentMemoryBufSz)); 293 294 System.arraycopy(data, byteswritten, currentMemoryBuf, 296 currentMemoryBufSz, bytes2write); 297 298 byteswritten += bytes2write; 299 currentMemoryBufSz += bytes2write; 300 301 if (byteswritten 302 < writesz) { currentMemoryBuf = new byte[READ_CHUNK_SZ]; 304 currentMemoryBufSz = 0; 305 306 memorybuflist.add(currentMemoryBuf); } 308 } while (byteswritten < writesz); 309 } 310 311 if (null != cachediskstream) { cachediskstream.write(data, 0, length); 313 } 314 315 totalsz += writesz; 316 317 return; 318 } 319 320 326 protected synchronized void close() throws java.io.IOException { 327 328 if (!closed) { 329 closed = true; 331 if (null != cachediskstream) { cachediskstream.close(); 333 334 cachediskstream = null; 335 } 336 337 if (null != memorybuflist) { if (currentMemoryBufSz > 0) { 339 byte[] tmp = 340 new byte[currentMemoryBufSz]; 342 System.arraycopy(currentMemoryBuf, 0, tmp, 0, 343 currentMemoryBufSz); 344 memorybuflist.set( 345 memorybuflist.size() - 1, 346 tmp); } 348 349 currentMemoryBuf = null; } 351 } 352 } 353 354 protected void finalize() throws Throwable { 355 356 if (null != cachediskstream) { cachediskstream.close(); 358 359 cachediskstream = null; 360 } 361 } 362 363 369 protected void flushToDisk() 370 throws java.io.IOException , java.io.FileNotFoundException { 371 372 java.util.LinkedList ml = memorybuflist; 373 374 log.debug(Messages.getMessage("maxCached", "" + maxCached, 375 "" + totalsz)); 376 377 if (ml != null) { 378 if (null == cachediskstream) { try { 380 MessageContext mc = MessageContext.getCurrentContext(); 381 String attdir = (mc == null) 382 ? null 383 : mc.getStrProp( 384 MessageContext.ATTACHMENTS_DIR); 385 386 diskCacheFile = java.io.File.createTempFile("Axis", ".att", 387 (attdir == null) 388 ? null 389 : new File ( 390 attdir)); 391 392 if(log.isDebugEnabled()) { 393 log.debug( 394 Messages.getMessage( 395 "diskCache", diskCacheFile.getAbsolutePath())); 396 } 397 398 cachediskstream = new java.io.BufferedOutputStream ( 399 new java.io.FileOutputStream (diskCacheFile)); 400 401 int listsz = ml.size(); 402 403 for (java.util.Iterator it = ml.iterator(); 405 it.hasNext();) { 406 byte[] rbuf = (byte[]) it.next(); 407 int bwrite = (listsz-- == 0) 408 ? currentMemoryBufSz 409 : rbuf.length; 410 411 cachediskstream.write(rbuf, 0, bwrite); 412 413 if (closed) { 414 cachediskstream.close(); 415 416 cachediskstream = null; 417 } 418 } 419 420 memorybuflist = null; 421 } catch (java.lang.SecurityException se) { 422 diskCacheFile = null; 423 cachediskstream = null; 424 maxCached = java.lang.Integer.MAX_VALUE; 425 426 log.info(Messages.getMessage("nodisk00"), se); 427 } 428 } 429 } 430 } 431 432 public synchronized boolean delete() { 433 434 boolean ret = false; 435 436 deleted = true; 437 438 memorybuflist = null; 439 440 if (diskCacheFile != null) { 441 if (cachediskstream != null) { 442 try { 443 cachediskstream.close(); 444 } catch (Exception e) { 445 } 446 447 cachediskstream = null; 448 } 449 450 Object [] array = readers.keySet().toArray(); 451 for (int i = 0; i < array.length; i++) { 452 Instream stream = (Instream) array[i]; 453 if (null != stream) { 454 try { 455 stream.close(); 456 } catch (Exception e) { 457 } 458 } 459 } 460 readers.clear(); 461 462 try { 463 diskCacheFile.delete(); 464 465 ret = true; 466 } catch (Exception e) { 467 468 diskCacheFile.deleteOnExit(); 470 } 471 } 472 473 474 return ret; 475 } 476 477 479 480 protected static Log is_log = 481 LogFactory.getLog(Instream.class.getName()); 482 483 487 private class Instream extends java.io.InputStream { 488 489 490 protected int bread = 0; 491 492 493 java.io.FileInputStream fin = null; 494 495 496 int currentIndex = 497 0; 498 499 500 byte[] currentBuf = null; 501 502 503 int currentBufPos = 0; 504 505 506 boolean readClosed = false; 507 508 514 protected Instream() throws java.io.IOException { 515 516 if (deleted) { 517 throw new java.io.IOException ( 518 Messages.getMessage("resourceDeleted")); 519 } 520 521 readers.put(this, null); 522 } 523 524 532 public int available() throws java.io.IOException { 533 534 if (deleted) { 535 throw new java.io.IOException ( 536 Messages.getMessage("resourceDeleted")); 537 } 538 539 if (readClosed) { 540 throw new java.io.IOException ( 541 Messages.getMessage("streamClosed")); 542 } 543 int ret = new Long (totalsz - bread).intValue(); 545 546 if (debugEnabled) { 547 is_log.debug("available() = " + ret + "."); 548 } 549 550 return ret; 551 } 552 553 560 public int read() throws java.io.IOException { 561 562 synchronized (ManagedMemoryDataSource.this) { 563 byte[] retb = new byte[1]; 564 int br = read(retb, 0, 1); 565 566 if (br == -1) { 567 return -1; 568 } 569 return 0xFF & retb[0]; 570 } 571 } 572 573 578 public boolean markSupported() { 579 580 if (debugEnabled) { 581 is_log.debug("markSupported() = " + false + "."); 582 } 583 584 return false; 585 } 586 587 592 public void mark(int readlimit) { 593 594 if (debugEnabled) { 595 is_log.debug("mark()"); 596 } 597 } 598 599 604 public void reset() throws java.io.IOException { 605 606 if (debugEnabled) { 607 is_log.debug("reset()"); 608 } 609 610 throw new java.io.IOException (Messages.getMessage("noResetMark")); 611 } 612 613 public long skip(long skipped) throws java.io.IOException { 614 615 if (debugEnabled) { 616 is_log.debug("skip(" + skipped + ")."); 617 } 618 619 if (deleted) { 620 throw new java.io.IOException ( 621 Messages.getMessage("resourceDeleted")); 622 } 623 624 if (readClosed) { 625 throw new java.io.IOException ( 626 Messages.getMessage("streamClosed")); 627 } 628 629 if (skipped < 1) { 630 return 0; } 632 633 synchronized (ManagedMemoryDataSource.this) { 634 skipped = Math.min(skipped, 635 totalsz 636 - bread); 638 if (skipped == 0) { 639 return 0; 640 } 641 642 java.util.List ml = memorybuflist; int bwritten = 0; 644 645 if (ml != null) { 646 if (null == currentBuf) { currentBuf = (byte[]) ml.get(currentIndex); 648 currentBufPos = 0; } 650 651 do { 652 long bcopy = Math.min(currentBuf.length 653 - currentBufPos, 654 skipped - bwritten); 655 656 bwritten += bcopy; 657 currentBufPos += bcopy; 658 659 if (bwritten < skipped) { 660 currentBuf = (byte[]) ml.get(++currentIndex); 661 currentBufPos = 0; 662 } 663 } while (bwritten < skipped); 664 } 665 666 if (null != fin) { 667 fin.skip(skipped); 668 } 669 670 bread += skipped; 671 } 672 673 if (debugEnabled) { 674 is_log.debug("skipped " + skipped + "."); 675 } 676 677 return skipped; 678 } 679 680 public int read(byte[] b, int off, int len) throws java.io.IOException { 681 682 if (debugEnabled) { 683 is_log.debug(this.hashCode() + " read(" + off + ", " + len 684 + ")"); 685 } 686 687 if (deleted) { 688 throw new java.io.IOException ( 689 Messages.getMessage("resourceDeleted")); 690 } 691 692 if (readClosed) { 693 throw new java.io.IOException ( 694 Messages.getMessage("streamClosed")); 695 } 696 697 if (b == null) { 698 throw new InternalException(Messages.getMessage("nullInput")); 699 } 700 701 if (off < 0) { 702 throw new IndexOutOfBoundsException ( 703 Messages.getMessage("negOffset", "" + off)); 704 } 705 706 if (len < 0) { 707 throw new IndexOutOfBoundsException ( 708 Messages.getMessage("length", "" + len)); 709 } 710 711 if (len + off > b.length) { 712 throw new IndexOutOfBoundsException ( 713 Messages.getMessage("writeBeyond")); 714 } 715 716 if (len == 0) { 717 return 0; 718 } 719 720 int bwritten = 0; 721 722 synchronized (ManagedMemoryDataSource.this) { 723 if (bread == totalsz) { 724 return -1; 725 } 726 727 java.util.List ml = memorybuflist; 728 729 long longlen = len; 730 longlen = Math.min( 731 longlen, 732 totalsz 733 - bread); len = new Long (longlen).intValue(); 735 736 if (debugEnabled) { 737 is_log.debug("len = " + len); 738 } 739 740 if (ml != null) { 741 if (null == currentBuf) { currentBuf = (byte[]) ml.get(currentIndex); 743 currentBufPos = 0; } 745 746 do { 747 748 int bcopy = Math.min(currentBuf.length - currentBufPos, 750 len - bwritten); 751 752 System.arraycopy(currentBuf, currentBufPos, b, 754 off + bwritten, bcopy); 755 756 bwritten += bcopy; 757 currentBufPos += bcopy; 758 759 if (bwritten < len) { currentBuf = (byte[]) ml.get(++currentIndex); 761 currentBufPos = 0; 762 } 763 } while (bwritten < len); 764 } 765 766 if ((bwritten == 0) && (null != diskCacheFile)) { 767 if (debugEnabled) { 768 is_log.debug(Messages.getMessage("reading", "" + len)); 769 } 770 771 if (null == fin) { if (debugEnabled) { 773 is_log.debug( 774 Messages.getMessage( 775 "openBread", 776 diskCacheFile.getCanonicalPath())); 777 } 778 779 if (debugEnabled) { 780 is_log.debug(Messages.getMessage("openBread", 781 "" + bread)); 782 } 783 784 fin = new java.io.FileInputStream (diskCacheFile); 785 786 if (bread > 0) { 787 fin.skip(bread); } 789 } 790 791 if (cachediskstream != null) { 792 if (debugEnabled) { 793 is_log.debug(Messages.getMessage("flushing")); 794 } 795 796 cachediskstream.flush(); 797 } 798 799 if (debugEnabled) { 800 is_log.debug(Messages.getMessage("flushing")); 801 is_log.debug("len=" + len); 802 is_log.debug("off=" + off); 803 is_log.debug("b.length=" + b.length); 804 } 805 806 bwritten = fin.read(b, off, len); 807 } 808 809 if (bwritten > 0) { 810 bread += bwritten; 811 } 812 } 813 814 if (debugEnabled) { 815 is_log.debug(this.hashCode() 816 + Messages.getMessage("read", "" + bwritten)); 817 } 818 819 return bwritten; 820 } 821 822 827 public synchronized void close() throws java.io.IOException { 828 829 if (debugEnabled) { 830 is_log.debug("close()"); 831 } 832 833 if (!readClosed) { 834 readers.remove(this); 835 836 readClosed = true; 837 838 if (fin != null) { 839 fin.close(); 840 } 841 842 fin = null; 843 } 844 } 845 846 protected void finalize() throws Throwable { 847 close(); 848 } 849 } 851 853 858 public static void main(String arg[]) { 860 try { 861 String readFile = arg[0]; 862 String writeFile = arg[1]; 863 java.io.FileInputStream ss = 864 new java.io.FileInputStream (readFile); 865 ManagedMemoryDataSource ms = 866 new ManagedMemoryDataSource(ss, 1024 * 1024, "foo/data", true); 867 javax.activation.DataHandler dh = 868 new javax.activation.DataHandler (ms); 869 java.io.InputStream is = dh.getInputStream(); 870 java.io.FileOutputStream fo = 871 new java.io.FileOutputStream (writeFile); 872 byte[] buf = new byte[512]; 873 int read = 0; 874 875 do { 876 read = is.read(buf); 877 878 if (read > 0) { 879 fo.write(buf, 0, read); 880 } 881 } while (read > -1); 882 883 fo.close(); 884 is.close(); 885 } catch (java.lang.Exception e) { 886 log.error(Messages.getMessage("exception00"), e); 887 } 888 } 889 890 894 public File getDiskCacheFile() { 895 return diskCacheFile; 896 } 897 } 898 | Popular Tags |