1 20 package org.apache.mina.filter; 21 22 import java.util.Iterator ; 23 import java.util.List ; 24 25 import org.apache.mina.common.ByteBuffer; 26 import org.apache.mina.common.DefaultIoFilterChainBuilder; 27 import org.apache.mina.common.IoFilterAdapter; 28 import org.apache.mina.common.IoFilterChain; 29 import org.apache.mina.common.IoSession; 30 import org.apache.mina.common.IoFilterChain.Entry; 31 import org.apache.mina.filter.executor.ExecutorFilter; 32 33 62 public class ReadThrottleFilterBuilder { 63 public static final String COUNTER = ReadThrottleFilterBuilder.class 64 .getName() 65 + ".counter"; 66 67 public static final String SUSPENDED_READS = ReadThrottleFilterBuilder.class 68 .getName() 69 + ".suspendedReads"; 70 71 private volatile int maximumConnectionBufferSize = 1024 * 1024; 73 80 public void setMaximumConnectionBufferSize(int maximumConnectionBufferSize) { 81 this.maximumConnectionBufferSize = maximumConnectionBufferSize; 82 } 83 84 90 public void attach(IoFilterChain chain) { 91 String name = getThreadPoolFilterEntryName(chain.getAll()); 92 93 chain.addBefore(name, getClass().getName() + ".add", new Add()); 94 chain.addAfter(name, getClass().getName() + ".release", new Release()); 95 } 96 97 103 public void attach(DefaultIoFilterChainBuilder builder) { 104 String name = getThreadPoolFilterEntryName(builder.getAll()); 105 106 builder.addBefore(name, getClass().getName() + ".add", new Add()); 107 builder 108 .addAfter(name, getClass().getName() + ".release", 109 new Release()); 110 } 111 112 private String getThreadPoolFilterEntryName(List <Entry> entries) { 113 Iterator <Entry> i = entries.iterator(); 114 115 while (i.hasNext()) { 116 IoFilterChain.Entry entry = i.next(); 117 118 if (entry.getFilter().getClass().isAssignableFrom( 119 ExecutorFilter.class)) { 120 return entry.getName(); 121 } 122 } 123 124 throw new IllegalStateException ( 125 "Chain does not contain a ExecutorFilter"); 126 } 127 128 private void add(IoSession session, int size) { 129 synchronized (session) { 130 int counter = getCounter(session) + size; 131 132 session.setAttribute(COUNTER, new Integer (counter)); 133 134 if (counter >= maximumConnectionBufferSize 135 && session.getTrafficMask().isReadable()) { 136 session.suspendRead(); 137 session.setAttribute(SUSPENDED_READS); 138 } 139 } 140 } 141 142 private void release(IoSession session, int size) { 143 synchronized (session) { 144 int counter = Math.max(0, getCounter(session) - size); 145 146 session.setAttribute(COUNTER, new Integer (counter)); 147 148 if (counter < maximumConnectionBufferSize 149 && isSuspendedReads(session)) { 150 session.resumeRead(); 151 session.removeAttribute(SUSPENDED_READS); 152 } 153 154 } 155 } 156 157 private boolean isSuspendedReads(IoSession session) { 158 Boolean flag = (Boolean ) session.getAttribute(SUSPENDED_READS); 159 160 return null != flag && flag.booleanValue(); 161 } 162 163 private int getCounter(IoSession session) { 164 Integer i = (Integer ) session.getAttribute(COUNTER); 165 return null == i ? 0 : i.intValue(); 166 } 167 168 private class Add extends IoFilterAdapter { 169 public void messageReceived(NextFilter nextFilter, IoSession session, 170 Object message) throws Exception { 171 if (message instanceof ByteBuffer) { 172 add(session, ((ByteBuffer) message).remaining()); 173 } 174 175 nextFilter.messageReceived(session, message); 176 } 177 } 178 179 private class Release extends IoFilterAdapter { 180 public void messageReceived(NextFilter nextFilter, IoSession session, 181 Object message) throws Exception { 182 if (message instanceof ByteBuffer) { 183 release(session, ((ByteBuffer) message).remaining()); 184 } 185 186 nextFilter.messageReceived(session, message); 187 } 188 } 189 } 190 | Popular Tags |