1 20 package org.apache.mina.filter; 21 22 import java.io.IOException ; 23 import java.io.InputStream ; 24 import java.util.Queue ; 25 import java.util.concurrent.ConcurrentLinkedQueue ; 26 27 import org.apache.mina.common.ByteBuffer; 28 import org.apache.mina.common.IoFilterAdapter; 29 import org.apache.mina.common.IoSession; 30 import org.apache.mina.common.WriteFuture; 31 32 57 public class StreamWriteFilter extends IoFilterAdapter { 58 61 public static final int DEFAULT_STREAM_BUFFER_SIZE = 4096; 62 63 66 public static final String CURRENT_STREAM = StreamWriteFilter.class 67 .getName() 68 + ".stream"; 69 70 protected static final String WRITE_REQUEST_QUEUE = StreamWriteFilter.class 71 .getName() 72 + ".queue"; 73 74 protected static final String INITIAL_WRITE_FUTURE = StreamWriteFilter.class 75 .getName() 76 + ".future"; 77 78 private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE; 79 80 @Override 81 public void filterWrite(NextFilter nextFilter, IoSession session, 82 WriteRequest writeRequest) throws Exception { 83 if (session.getAttribute(CURRENT_STREAM) != null) { 85 Queue <WriteRequest> queue = getWriteRequestQueue(session); 86 if (queue == null) { 87 queue = new ConcurrentLinkedQueue <WriteRequest>(); 88 session.setAttribute(WRITE_REQUEST_QUEUE, queue); 89 } 90 queue.add(writeRequest); 91 return; 92 } 93 94 Object message = writeRequest.getMessage(); 95 96 if (message instanceof InputStream ) { 97 98 InputStream inputStream = (InputStream ) message; 99 100 ByteBuffer byteBuffer = getNextByteBuffer(inputStream); 101 if (byteBuffer == null) { 102 writeRequest.getFuture().setWritten(true); 104 nextFilter.messageSent(session, message); 105 } else { 106 session.setAttribute(CURRENT_STREAM, inputStream); 107 session.setAttribute(INITIAL_WRITE_FUTURE, writeRequest 108 .getFuture()); 109 110 nextFilter.filterWrite(session, new WriteRequest(byteBuffer)); 111 } 112 113 } else { 114 nextFilter.filterWrite(session, writeRequest); 115 } 116 } 117 118 @SuppressWarnings ("unchecked") 119 private Queue <WriteRequest> getWriteRequestQueue(IoSession session) { 120 return (Queue <WriteRequest>) session.getAttribute(WRITE_REQUEST_QUEUE); 121 } 122 123 @Override 124 public void messageSent(NextFilter nextFilter, IoSession session, 125 Object message) throws Exception { 126 InputStream inputStream = (InputStream ) session 127 .getAttribute(CURRENT_STREAM); 128 129 if (inputStream == null) { 130 nextFilter.messageSent(session, message); 131 } else { 132 ByteBuffer byteBuffer = getNextByteBuffer(inputStream); 133 134 if (byteBuffer == null) { 135 session.removeAttribute(CURRENT_STREAM); 137 WriteFuture writeFuture = (WriteFuture) session 138 .removeAttribute(INITIAL_WRITE_FUTURE); 139 140 Queue <? extends WriteRequest> queue = (Queue <? extends WriteRequest>) session 142 .removeAttribute(WRITE_REQUEST_QUEUE); 143 if (queue != null) { 144 WriteRequest wr = queue.poll(); 145 while (wr != null) { 146 filterWrite(nextFilter, session, wr); 147 wr = queue.poll(); 148 } 149 } 150 151 writeFuture.setWritten(true); 152 nextFilter.messageSent(session, inputStream); 153 } else { 154 nextFilter.filterWrite(session, new WriteRequest(byteBuffer)); 155 } 156 } 157 } 158 159 private ByteBuffer getNextByteBuffer(InputStream is) throws IOException { 160 byte[] bytes = new byte[writeBufferSize]; 161 162 int off = 0; 163 int n = 0; 164 while (off < bytes.length 165 && (n = is.read(bytes, off, bytes.length - off)) != -1) { 166 off += n; 167 } 168 169 if (n == -1 && off == 0) { 170 return null; 171 } 172 173 return ByteBuffer.wrap(bytes, 0, off); 174 } 175 176 182 public int getWriteBufferSize() { 183 return writeBufferSize; 184 } 185 186 192 public void setWriteBufferSize(int writeBufferSize) { 193 if (writeBufferSize < 1) { 194 throw new IllegalArgumentException ( 195 "writeBufferSize must be at least 1"); 196 } 197 this.writeBufferSize = writeBufferSize; 198 } 199 200 } 201 | Popular Tags |