1 20 package org.apache.mina.filter.executor; 21 22 import java.util.LinkedList ; 23 import java.util.Queue ; 24 import java.util.concurrent.Executor ; 25 import java.util.concurrent.LinkedBlockingQueue ; 26 import java.util.concurrent.ThreadPoolExecutor ; 27 import java.util.concurrent.TimeUnit ; 28 29 import org.apache.mina.common.IdleStatus; 30 import org.apache.mina.common.IoFilterAdapter; 31 import org.apache.mina.common.IoFilterChain; 32 import org.apache.mina.common.IoSession; 33 import org.slf4j.Logger; 34 import org.slf4j.LoggerFactory; 35 36 47 public class ExecutorFilter extends IoFilterAdapter { 48 private final Logger logger = LoggerFactory.getLogger(getClass()); 49 50 private final Executor executor; 51 52 56 public ExecutorFilter() { 57 this(new ThreadPoolExecutor (16, 16, 60, TimeUnit.SECONDS, 58 new LinkedBlockingQueue <Runnable >())); 59 } 60 61 64 public ExecutorFilter(Executor executor) { 65 if (executor == null) { 66 throw new NullPointerException ("executor"); 67 } 68 69 this.executor = executor; 70 } 71 72 75 public Executor getExecutor() { 76 return executor; 77 } 78 79 private void fireEvent(NextFilter nextFilter, IoSession session, 80 EventType type, Object data) { 81 Event event = new Event(type, nextFilter, data); 82 SessionBuffer buf = SessionBuffer.getSessionBuffer(session); 83 84 boolean execute; 85 synchronized (buf.eventQueue) { 86 buf.eventQueue.offer(event); 87 if (buf.processingCompleted) { 88 buf.processingCompleted = false; 89 execute = true; 90 } else { 91 execute = false; 92 } 93 } 94 95 if (execute) { 96 if (logger.isDebugEnabled()) { 97 logger.debug("Launching thread for " 98 + session.getRemoteAddress()); 99 } 100 101 executor.execute(new ProcessEventsRunnable(buf)); 102 } 103 } 104 105 private static class SessionBuffer { 106 private static final String KEY = SessionBuffer.class.getName() 107 + ".KEY"; 108 109 private static SessionBuffer getSessionBuffer(IoSession session) { 110 synchronized (session) { 111 SessionBuffer buf = (SessionBuffer) session.getAttribute(KEY); 112 if (buf == null) { 113 buf = new SessionBuffer(session); 114 session.setAttribute(KEY, buf); 115 } 116 return buf; 117 } 118 } 119 120 private final IoSession session; 121 122 private final Queue <Event> eventQueue = new LinkedList <Event>(); 123 124 private boolean processingCompleted = true; 125 126 private SessionBuffer(IoSession session) { 127 this.session = session; 128 } 129 } 130 131 protected static class EventType { 132 public static final EventType OPENED = new EventType("OPENED"); 133 134 public static final EventType CLOSED = new EventType("CLOSED"); 135 136 public static final EventType READ = new EventType("READ"); 137 138 public static final EventType WRITTEN = new EventType("WRITTEN"); 139 140 public static final EventType RECEIVED = new EventType("RECEIVED"); 141 142 public static final EventType SENT = new EventType("SENT"); 143 144 public static final EventType IDLE = new EventType("IDLE"); 145 146 public static final EventType EXCEPTION = new EventType("EXCEPTION"); 147 148 private final String value; 149 150 private EventType(String value) { 151 this.value = value; 152 } 153 154 public String toString() { 155 return value; 156 } 157 } 158 159 protected static class Event { 160 private final EventType type; 161 162 private final NextFilter nextFilter; 163 164 private final Object data; 165 166 Event(EventType type, NextFilter nextFilter, Object data) { 167 this.type = type; 168 this.nextFilter = nextFilter; 169 this.data = data; 170 } 171 172 public Object getData() { 173 return data; 174 } 175 176 public NextFilter getNextFilter() { 177 return nextFilter; 178 } 179 180 public EventType getType() { 181 return type; 182 } 183 } 184 185 public void sessionCreated(NextFilter nextFilter, IoSession session) { 186 nextFilter.sessionCreated(session); 187 } 188 189 public void sessionOpened(NextFilter nextFilter, IoSession session) { 190 fireEvent(nextFilter, session, EventType.OPENED, null); 191 } 192 193 public void sessionClosed(NextFilter nextFilter, IoSession session) { 194 fireEvent(nextFilter, session, EventType.CLOSED, null); 195 } 196 197 public void sessionIdle(NextFilter nextFilter, IoSession session, 198 IdleStatus status) { 199 fireEvent(nextFilter, session, EventType.IDLE, status); 200 } 201 202 public void exceptionCaught(NextFilter nextFilter, IoSession session, 203 Throwable cause) { 204 fireEvent(nextFilter, session, EventType.EXCEPTION, cause); 205 } 206 207 public void messageReceived(NextFilter nextFilter, IoSession session, 208 Object message) { 209 fireEvent(nextFilter, session, EventType.RECEIVED, message); 210 } 211 212 public void messageSent(NextFilter nextFilter, IoSession session, 213 Object message) { 214 fireEvent(nextFilter, session, EventType.SENT, message); 215 } 216 217 protected void processEvent(NextFilter nextFilter, IoSession session, 218 EventType type, Object data) { 219 if (type == EventType.RECEIVED) { 220 nextFilter.messageReceived(session, data); 221 } else if (type == EventType.SENT) { 222 nextFilter.messageSent(session, data); 223 } else if (type == EventType.EXCEPTION) { 224 nextFilter.exceptionCaught(session, (Throwable ) data); 225 } else if (type == EventType.IDLE) { 226 nextFilter.sessionIdle(session, (IdleStatus) data); 227 } else if (type == EventType.OPENED) { 228 nextFilter.sessionOpened(session); 229 } else if (type == EventType.CLOSED) { 230 nextFilter.sessionClosed(session); 231 } 232 } 233 234 public void filterWrite(NextFilter nextFilter, IoSession session, 235 WriteRequest writeRequest) { 236 nextFilter.filterWrite(session, writeRequest); 237 } 238 239 public void filterClose(NextFilter nextFilter, IoSession session) 240 throws Exception { 241 nextFilter.filterClose(session); 242 } 243 244 private class ProcessEventsRunnable implements Runnable { 245 private final SessionBuffer buffer; 246 247 ProcessEventsRunnable(SessionBuffer buffer) { 248 this.buffer = buffer; 249 } 250 251 public void run() { 252 while (true) { 253 Event event; 254 255 synchronized (buffer.eventQueue) { 256 event = buffer.eventQueue.poll(); 257 258 if (event == null) { 259 buffer.processingCompleted = true; 260 break; 261 } 262 } 263 264 processEvent(event.getNextFilter(), buffer.session, event 265 .getType(), event.getData()); 266 } 267 268 if (logger.isDebugEnabled()) { 269 logger.debug("Exiting since queue is empty for " 270 + buffer.session.getRemoteAddress()); 271 } 272 } 273 } 274 } 275 | Popular Tags |