KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > filter > executor > ExecutorFilter


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.filter.executor;
21
22 import java.util.LinkedList JavaDoc;
23 import java.util.Queue JavaDoc;
24 import java.util.concurrent.Executor JavaDoc;
25 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
26 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
27 import java.util.concurrent.TimeUnit JavaDoc;
28
29 import org.apache.mina.common.IdleStatus;
30 import org.apache.mina.common.IoFilterAdapter;
31 import org.apache.mina.common.IoFilterChain;
32 import org.apache.mina.common.IoSession;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * A filter that forward events to {@link Executor} in
38  * <a HREF="http://dcl.mathcs.emory.edu/util/backport-util-concurrent/">backport-util-concurrent</a>.
39  * You can apply various thread model by inserting this filter to the {@link IoFilterChain}.
40  * <p>
41  * Please note that this filter doesn't manage the life cycle of the underlying
42  * {@link Executor}. You have to destroy or stop it by yourself.
43  *
44  * @author The Apache MINA Project (dev@mina.apache.org)
45  * @version $Rev: 350169 $, $Date: 2005-12-01 00:17:41 -0500 (Thu, 01 Dec 2005) $
46  */

47 public class ExecutorFilter extends IoFilterAdapter {
48     private final Logger logger = LoggerFactory.getLogger(getClass());
49
50     private final Executor JavaDoc executor;
51
52     /**
53      * Creates a new instace with the default thread pool implementation
54      * (<tt>new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() )</tt>).
55      */

56     public ExecutorFilter() {
57         this(new ThreadPoolExecutor JavaDoc(16, 16, 60, TimeUnit.SECONDS,
58                 new LinkedBlockingQueue JavaDoc<Runnable JavaDoc>()));
59     }
60
61     /**
62      * Creates a new instance with the specified <tt>executor</tt>.
63      */

64     public ExecutorFilter(Executor JavaDoc executor) {
65         if (executor == null) {
66             throw new NullPointerException JavaDoc("executor");
67         }
68
69         this.executor = executor;
70     }
71
72     /**
73      * Returns the underlying {@link Executor} instance this filter uses.
74      */

75     public Executor JavaDoc getExecutor() {
76         return executor;
77     }
78
79     private void fireEvent(NextFilter nextFilter, IoSession session,
80             EventType type, Object JavaDoc data) {
81         Event event = new Event(type, nextFilter, data);
82         SessionBuffer buf = SessionBuffer.getSessionBuffer(session);
83
84         boolean execute;
85         synchronized (buf.eventQueue) {
86             buf.eventQueue.offer(event);
87             if (buf.processingCompleted) {
88                 buf.processingCompleted = false;
89                 execute = true;
90             } else {
91                 execute = false;
92             }
93         }
94
95         if (execute) {
96             if (logger.isDebugEnabled()) {
97                 logger.debug("Launching thread for "
98                         + session.getRemoteAddress());
99             }
100
101             executor.execute(new ProcessEventsRunnable(buf));
102         }
103     }
104
105     private static class SessionBuffer {
106         private static final String JavaDoc KEY = SessionBuffer.class.getName()
107                 + ".KEY";
108
109         private static SessionBuffer getSessionBuffer(IoSession session) {
110             synchronized (session) {
111                 SessionBuffer buf = (SessionBuffer) session.getAttribute(KEY);
112                 if (buf == null) {
113                     buf = new SessionBuffer(session);
114                     session.setAttribute(KEY, buf);
115                 }
116                 return buf;
117             }
118         }
119
120         private final IoSession session;
121
122         private final Queue JavaDoc<Event> eventQueue = new LinkedList JavaDoc<Event>();
123
124         private boolean processingCompleted = true;
125
126         private SessionBuffer(IoSession session) {
127             this.session = session;
128         }
129     }
130
131     protected static class EventType {
132         public static final EventType OPENED = new EventType("OPENED");
133
134         public static final EventType CLOSED = new EventType("CLOSED");
135
136         public static final EventType READ = new EventType("READ");
137
138         public static final EventType WRITTEN = new EventType("WRITTEN");
139
140         public static final EventType RECEIVED = new EventType("RECEIVED");
141
142         public static final EventType SENT = new EventType("SENT");
143
144         public static final EventType IDLE = new EventType("IDLE");
145
146         public static final EventType EXCEPTION = new EventType("EXCEPTION");
147
148         private final String JavaDoc value;
149
150         private EventType(String JavaDoc value) {
151             this.value = value;
152         }
153
154         public String JavaDoc toString() {
155             return value;
156         }
157     }
158
159     protected static class Event {
160         private final EventType type;
161
162         private final NextFilter nextFilter;
163
164         private final Object JavaDoc data;
165
166         Event(EventType type, NextFilter nextFilter, Object JavaDoc data) {
167             this.type = type;
168             this.nextFilter = nextFilter;
169             this.data = data;
170         }
171
172         public Object JavaDoc getData() {
173             return data;
174         }
175
176         public NextFilter getNextFilter() {
177             return nextFilter;
178         }
179
180         public EventType getType() {
181             return type;
182         }
183     }
184
185     public void sessionCreated(NextFilter nextFilter, IoSession session) {
186         nextFilter.sessionCreated(session);
187     }
188
189     public void sessionOpened(NextFilter nextFilter, IoSession session) {
190         fireEvent(nextFilter, session, EventType.OPENED, null);
191     }
192
193     public void sessionClosed(NextFilter nextFilter, IoSession session) {
194         fireEvent(nextFilter, session, EventType.CLOSED, null);
195     }
196
197     public void sessionIdle(NextFilter nextFilter, IoSession session,
198             IdleStatus status) {
199         fireEvent(nextFilter, session, EventType.IDLE, status);
200     }
201
202     public void exceptionCaught(NextFilter nextFilter, IoSession session,
203             Throwable JavaDoc cause) {
204         fireEvent(nextFilter, session, EventType.EXCEPTION, cause);
205     }
206
207     public void messageReceived(NextFilter nextFilter, IoSession session,
208             Object JavaDoc message) {
209         fireEvent(nextFilter, session, EventType.RECEIVED, message);
210     }
211
212     public void messageSent(NextFilter nextFilter, IoSession session,
213             Object JavaDoc message) {
214         fireEvent(nextFilter, session, EventType.SENT, message);
215     }
216
217     protected void processEvent(NextFilter nextFilter, IoSession session,
218             EventType type, Object JavaDoc data) {
219         if (type == EventType.RECEIVED) {
220             nextFilter.messageReceived(session, data);
221         } else if (type == EventType.SENT) {
222             nextFilter.messageSent(session, data);
223         } else if (type == EventType.EXCEPTION) {
224             nextFilter.exceptionCaught(session, (Throwable JavaDoc) data);
225         } else if (type == EventType.IDLE) {
226             nextFilter.sessionIdle(session, (IdleStatus) data);
227         } else if (type == EventType.OPENED) {
228             nextFilter.sessionOpened(session);
229         } else if (type == EventType.CLOSED) {
230             nextFilter.sessionClosed(session);
231         }
232     }
233
234     public void filterWrite(NextFilter nextFilter, IoSession session,
235             WriteRequest writeRequest) {
236         nextFilter.filterWrite(session, writeRequest);
237     }
238
239     public void filterClose(NextFilter nextFilter, IoSession session)
240             throws Exception JavaDoc {
241         nextFilter.filterClose(session);
242     }
243
244     private class ProcessEventsRunnable implements Runnable JavaDoc {
245         private final SessionBuffer buffer;
246
247         ProcessEventsRunnable(SessionBuffer buffer) {
248             this.buffer = buffer;
249         }
250
251         public void run() {
252             while (true) {
253                 Event event;
254
255                 synchronized (buffer.eventQueue) {
256                     event = buffer.eventQueue.poll();
257
258                     if (event == null) {
259                         buffer.processingCompleted = true;
260                         break;
261                     }
262                 }
263
264                 processEvent(event.getNextFilter(), buffer.session, event
265                         .getType(), event.getData());
266             }
267
268             if (logger.isDebugEnabled()) {
269                 logger.debug("Exiting since queue is empty for "
270                         + buffer.session.getRemoteAddress());
271             }
272         }
273     }
274 }
275
Popular Tags