1 20 package org.apache.mina.filter.codec; 21 22 import org.apache.mina.common.ByteBuffer; 23 import org.apache.mina.common.ByteBufferProxy; 24 import org.apache.mina.common.IoFilter; 25 import org.apache.mina.common.IoFilterAdapter; 26 import org.apache.mina.common.IoFilterChain; 27 import org.apache.mina.common.IoSession; 28 import org.apache.mina.common.WriteFuture; 29 import org.apache.mina.common.support.DefaultWriteFuture; 30 import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; 31 import org.apache.mina.filter.codec.support.SimpleProtocolEncoderOutput; 32 import org.apache.mina.util.SessionLog; 33 34 42 public class ProtocolCodecFilter extends IoFilterAdapter { 43 public static final String ENCODER = ProtocolCodecFilter.class.getName() 44 + ".encoder"; 45 46 public static final String DECODER = ProtocolCodecFilter.class.getName() 47 + ".decoder"; 48 49 private static final String DECODER_LOCK = ProtocolCodecFilter.class 50 .getName() 51 + ".decoderLock"; 52 53 private static final Class <?>[] EMPTY_PARAMS = new Class [0]; 54 55 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new byte[0]); 56 57 private final ProtocolCodecFactory factory; 58 59 public ProtocolCodecFilter(ProtocolCodecFactory factory) { 60 if (factory == null) { 61 throw new NullPointerException ("factory"); 62 } 63 this.factory = factory; 64 } 65 66 public ProtocolCodecFilter(final ProtocolEncoder encoder, 67 final ProtocolDecoder decoder) { 68 if (encoder == null) { 69 throw new NullPointerException ("encoder"); 70 } 71 if (decoder == null) { 72 throw new NullPointerException ("decoder"); 73 } 74 75 this.factory = new ProtocolCodecFactory() { 76 public ProtocolEncoder getEncoder() { 77 return encoder; 78 } 79 80 public ProtocolDecoder getDecoder() { 81 return decoder; 82 } 83 }; 84 } 85 86 public ProtocolCodecFilter( 87 final Class <? extends ProtocolEncoder> encoderClass, 88 final Class <? extends ProtocolDecoder> decoderClass) { 89 if (encoderClass == null) { 90 throw new NullPointerException ("encoderClass"); 91 } 92 if (decoderClass == null) { 93 throw new NullPointerException ("decoderClass"); 94 } 95 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { 96 throw new IllegalArgumentException ("encoderClass: " 97 + encoderClass.getName()); 98 } 99 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { 100 throw new IllegalArgumentException ("decoderClass: " 101 + decoderClass.getName()); 102 } 103 try { 104 encoderClass.getConstructor(EMPTY_PARAMS); 105 } catch (NoSuchMethodException e) { 106 throw new IllegalArgumentException ( 107 "encoderClass doesn't have a public default constructor."); 108 } 109 try { 110 decoderClass.getConstructor(EMPTY_PARAMS); 111 } catch (NoSuchMethodException e) { 112 throw new IllegalArgumentException ( 113 "decoderClass doesn't have a public default constructor."); 114 } 115 116 this.factory = new ProtocolCodecFactory() { 117 public ProtocolEncoder getEncoder() throws Exception { 118 return encoderClass.newInstance(); 119 } 120 121 public ProtocolDecoder getDecoder() throws Exception { 122 return decoderClass.newInstance(); 123 } 124 }; 125 } 126 127 @Override 128 public void onPreAdd(IoFilterChain parent, String name, 129 NextFilter nextFilter) throws Exception { 130 if (parent.contains(ProtocolCodecFilter.class)) { 131 throw new IllegalStateException ( 132 "A filter chain cannot contain more than one ProtocolCodecFilter."); 133 } 134 } 135 136 public void onPostRemove(IoFilterChain parent, String name, 137 NextFilter nextFilter) throws Exception { 138 disposeEncoder(parent.getSession()); 139 disposeDecoder(parent.getSession()); 140 } 141 142 @Override 143 public void messageReceived(NextFilter nextFilter, IoSession session, 144 Object message) throws Exception { 145 if (!(message instanceof ByteBuffer)) { 146 nextFilter.messageReceived(session, message); 147 return; 148 } 149 150 ByteBuffer in = (ByteBuffer) message; 151 ProtocolDecoder decoder = getDecoder(session); 152 Object decoderLock = getDecoderLock(session); 153 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); 154 155 try { 156 synchronized (decoderLock) { 157 decoder.decode(session, in, decoderOut); 158 } 159 } catch (Throwable t) { 160 ProtocolDecoderException pde; 161 if (t instanceof ProtocolDecoderException) { 162 pde = (ProtocolDecoderException) t; 163 } else { 164 pde = new ProtocolDecoderException(t); 165 } 166 pde.setHexdump(in.getHexDump()); 167 throw pde; 168 } finally { 169 if (session.getTransportType().isConnectionless()) { 171 disposeDecoder(session); 172 } 173 174 in.release(); 176 177 decoderOut.flush(); 178 } 179 } 180 181 @Override 182 public void messageSent(NextFilter nextFilter, IoSession session, 183 Object message) throws Exception { 184 if (message instanceof HiddenByteBuffer) { 185 return; 186 } 187 188 if (!(message instanceof MessageByteBuffer)) { 189 nextFilter.messageSent(session, message); 190 return; 191 } 192 193 nextFilter.messageSent(session, ((MessageByteBuffer) message).message); 194 } 195 196 @Override 197 public void filterWrite(NextFilter nextFilter, IoSession session, 198 WriteRequest writeRequest) throws Exception { 199 Object message = writeRequest.getMessage(); 200 if (message instanceof ByteBuffer) { 201 nextFilter.filterWrite(session, writeRequest); 202 return; 203 } 204 205 ProtocolEncoder encoder = getEncoder(session); 206 ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session, 207 nextFilter, writeRequest); 208 209 try { 210 encoder.encode(session, message, encoderOut); 211 encoderOut.flush(); 212 nextFilter.filterWrite(session, new WriteRequest( 213 new MessageByteBuffer(writeRequest.getMessage()), 214 writeRequest.getFuture(), writeRequest.getDestination())); 215 } catch (Throwable t) { 216 ProtocolEncoderException pee; 217 if (t instanceof ProtocolEncoderException) { 218 pee = (ProtocolEncoderException) t; 219 } else { 220 pee = new ProtocolEncoderException(t); 221 } 222 throw pee; 223 } finally { 224 if (session.getTransportType().isConnectionless()) { 226 disposeEncoder(session); 227 } 228 } 229 } 230 231 @Override 232 public void sessionClosed(NextFilter nextFilter, IoSession session) 233 throws Exception { 234 ProtocolDecoder decoder = getDecoder(session); 236 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); 237 try { 238 decoder.finishDecode(session, decoderOut); 239 } catch (Throwable t) { 240 ProtocolDecoderException pde; 241 if (t instanceof ProtocolDecoderException) { 242 pde = (ProtocolDecoderException) t; 243 } else { 244 pde = new ProtocolDecoderException(t); 245 } 246 throw pde; 247 } finally { 248 disposeEncoder(session); 250 disposeDecoder(session); 251 252 decoderOut.flush(); 253 } 254 255 nextFilter.sessionClosed(session); 256 } 257 258 private ProtocolEncoder getEncoder(IoSession session) throws Exception { 259 ProtocolEncoder encoder = (ProtocolEncoder) session 260 .getAttribute(ENCODER); 261 if (encoder == null) { 262 encoder = factory.getEncoder(); 263 session.setAttribute(ENCODER, encoder); 264 } 265 return encoder; 266 } 267 268 private ProtocolEncoderOutputImpl getEncoderOut(IoSession session, 269 NextFilter nextFilter, WriteRequest writeRequest) { 270 return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); 271 } 272 273 private ProtocolDecoder getDecoder(IoSession session) throws Exception { 274 ProtocolDecoder decoder = (ProtocolDecoder) session 275 .getAttribute(DECODER); 276 if (decoder == null) { 277 decoder = factory.getDecoder(); 278 session.setAttribute(DECODER, decoder); 279 } 280 return decoder; 281 } 282 283 private Object getDecoderLock(IoSession session) { 284 Object lock = session.getAttribute(DECODER_LOCK); 285 if (lock == null) { 286 lock = new Object (); 287 session.setAttribute(DECODER_LOCK, lock); 288 } 289 290 return lock; 291 } 292 293 private ProtocolDecoderOutput getDecoderOut(IoSession session, 294 NextFilter nextFilter) { 295 return new SimpleProtocolDecoderOutput(session, nextFilter); 296 } 297 298 private void disposeEncoder(IoSession session) { 299 ProtocolEncoder encoder = (ProtocolEncoder) session 300 .removeAttribute(ENCODER); 301 if (encoder == null) { 302 return; 303 } 304 305 try { 306 encoder.dispose(session); 307 } catch (Throwable t) { 308 SessionLog.warn(session, "Failed to dispose: " 309 + encoder.getClass().getName() + " (" + encoder + ')'); 310 } 311 } 312 313 private void disposeDecoder(IoSession session) { 314 ProtocolDecoder decoder = (ProtocolDecoder) session 315 .removeAttribute(DECODER); 316 if (decoder == null) { 317 return; 318 } 319 320 try { 321 decoder.dispose(session); 322 } catch (Throwable t) { 323 SessionLog.warn(session, "Falied to dispose: " 324 + decoder.getClass().getName() + " (" + decoder + ')'); 325 } 326 } 327 328 private static class HiddenByteBuffer extends ByteBufferProxy { 329 private HiddenByteBuffer(ByteBuffer buf) { 330 super(buf); 331 } 332 } 333 334 private static class MessageByteBuffer extends ByteBufferProxy { 335 private final Object message; 336 337 private MessageByteBuffer(Object message) { 338 super(EMPTY_BUFFER); 339 this.message = message; 340 } 341 342 @Override 343 public void acquire() { 344 } 346 347 @Override 348 public void release() { 349 } 351 } 352 353 private static class ProtocolEncoderOutputImpl extends 354 SimpleProtocolEncoderOutput { 355 private final IoSession session; 356 357 private final NextFilter nextFilter; 358 359 private final WriteRequest writeRequest; 360 361 ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, 362 WriteRequest writeRequest) { 363 this.session = session; 364 this.nextFilter = nextFilter; 365 this.writeRequest = writeRequest; 366 } 367 368 @Override 369 protected WriteFuture doFlush(ByteBuffer buf) { 370 WriteFuture future = new DefaultWriteFuture(session); 371 nextFilter.filterWrite(session, new WriteRequest( 372 new HiddenByteBuffer(buf), future, writeRequest 373 .getDestination())); 374 return future; 375 } 376 } 377 } 378 | Popular Tags |