KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > mina > transport > vmpipe > support > VmPipeFilterChain


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  *
19  */

20 package org.apache.mina.transport.vmpipe.support;
21
22 import java.util.Queue JavaDoc;
23 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
24
25 import org.apache.mina.common.ByteBuffer;
26 import org.apache.mina.common.IdleStatus;
27 import org.apache.mina.common.IoSession;
28 import org.apache.mina.common.IoFilter.WriteRequest;
29 import org.apache.mina.common.support.AbstractIoFilterChain;
30
31 /**
32  * @author The Apache Directory Project (mina-dev@directory.apache.org)
33  * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13 7월 2007) $
34  */

35 public class VmPipeFilterChain extends AbstractIoFilterChain {
36
37     private final Queue JavaDoc<Event> eventQueue = new ConcurrentLinkedQueue JavaDoc<Event>();
38
39     private boolean flushEnabled;
40
41     public VmPipeFilterChain(IoSession session) {
42         super(session);
43     }
44
45     public void start() {
46         flushEnabled = true;
47         flushEvents();
48     }
49
50     private void pushEvent(Event e) {
51         eventQueue.offer(e);
52         if (flushEnabled) {
53             flushEvents();
54         }
55     }
56
57     private void flushEvents() {
58         Event e;
59         while ((e = eventQueue.poll()) != null) {
60             fireEvent(e);
61         }
62     }
63
64     private void fireEvent(Event e) {
65         IoSession session = getSession();
66         EventType type = e.getType();
67         Object JavaDoc data = e.getData();
68
69         if (type == EventType.RECEIVED) {
70             VmPipeSessionImpl s = (VmPipeSessionImpl) session;
71             synchronized (s.lock) {
72                 if (!s.getTrafficMask().isReadable()) {
73                     s.pendingDataQueue.offer(data);
74                 } else {
75                     int byteCount = 1;
76                     if (data instanceof ByteBuffer) {
77                         byteCount = ((ByteBuffer) data).remaining();
78                     }
79
80                     s.increaseReadBytes(byteCount);
81
82                     super.fireMessageReceived(s, data);
83                 }
84             }
85         } else if (type == EventType.WRITE) {
86             super.fireFilterWrite(session, (WriteRequest) data);
87         } else if (type == EventType.SENT) {
88             super.fireMessageSent(session, (WriteRequest) data);
89         } else if (type == EventType.EXCEPTION) {
90             super.fireExceptionCaught(session, (Throwable JavaDoc) data);
91         } else if (type == EventType.IDLE) {
92             super.fireSessionIdle(session, (IdleStatus) data);
93         } else if (type == EventType.OPENED) {
94             super.fireSessionOpened(session);
95         } else if (type == EventType.CREATED) {
96             super.fireSessionCreated(session);
97         } else if (type == EventType.CLOSED) {
98             super.fireSessionClosed(session);
99         } else if (type == EventType.CLOSE) {
100             super.fireFilterClose(session);
101         }
102     }
103
104     @Override JavaDoc
105     public void fireFilterClose(IoSession session) {
106         pushEvent(new Event(EventType.CLOSE, null));
107     }
108
109     @Override JavaDoc
110     public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
111         pushEvent(new Event(EventType.WRITE, writeRequest));
112     }
113
114     @Override JavaDoc
115     public void fireExceptionCaught(IoSession session, Throwable JavaDoc cause) {
116         pushEvent(new Event(EventType.EXCEPTION, cause));
117     }
118
119     @Override JavaDoc
120     public void fireMessageSent(IoSession session, WriteRequest request) {
121         pushEvent(new Event(EventType.SENT, request));
122     }
123
124     @Override JavaDoc
125     public void fireSessionClosed(IoSession session) {
126         pushEvent(new Event(EventType.CLOSED, null));
127     }
128
129     @Override JavaDoc
130     public void fireSessionCreated(IoSession session) {
131         pushEvent(new Event(EventType.CREATED, null));
132     }
133
134     @Override JavaDoc
135     public void fireSessionIdle(IoSession session, IdleStatus status) {
136         pushEvent(new Event(EventType.IDLE, status));
137     }
138
139     @Override JavaDoc
140     public void fireSessionOpened(IoSession session) {
141         pushEvent(new Event(EventType.OPENED, null));
142     }
143
144     @Override JavaDoc
145     public void fireMessageReceived(IoSession session, Object JavaDoc message) {
146         pushEvent(new Event(EventType.RECEIVED, message));
147     }
148
149     @Override JavaDoc
150     protected void doWrite(IoSession session, WriteRequest writeRequest) {
151         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
152         synchronized (s.lock) {
153             if (s.isConnected()) {
154
155                 if (!s.getTrafficMask().isWritable()) {
156                     s.pendingDataQueue.offer(writeRequest);
157                 } else {
158                     Object JavaDoc message = writeRequest.getMessage();
159
160                     int byteCount = 1;
161                     Object JavaDoc messageCopy = message;
162                     if (message instanceof ByteBuffer) {
163                         ByteBuffer rb = (ByteBuffer) message;
164                         rb.mark();
165                         byteCount = rb.remaining();
166                         ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
167                         wb.put(rb);
168                         wb.flip();
169                         rb.reset();
170                         messageCopy = wb;
171                     }
172
173                     s.increaseWrittenBytes(byteCount);
174                     s.increaseWrittenMessages();
175
176                     s.getRemoteSession().getFilterChain().fireMessageReceived(
177                             s.getRemoteSession(), messageCopy);
178                     s.getFilterChain().fireMessageSent(s, writeRequest);
179                 }
180             } else {
181                 writeRequest.getFuture().setWritten(false);
182             }
183         }
184     }
185
186     @Override JavaDoc
187     protected void doClose(IoSession session) {
188         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
189         synchronized (s.lock) {
190             if (!session.getCloseFuture().isClosed()) {
191                 s.getServiceListeners().fireSessionDestroyed(s);
192                 s.getRemoteSession().close();
193             }
194         }
195     }
196
197     // FIXME Copied and pasted from {@link ExecutorFilter}.
198
private static class EventType {
199         public static final EventType CREATED = new EventType("CREATED");
200
201         public static final EventType OPENED = new EventType("OPENED");
202
203         public static final EventType CLOSED = new EventType("CLOSED");
204
205         public static final EventType RECEIVED = new EventType("RECEIVED");
206
207         public static final EventType SENT = new EventType("SENT");
208
209         public static final EventType IDLE = new EventType("IDLE");
210
211         public static final EventType EXCEPTION = new EventType("EXCEPTION");
212
213         public static final EventType WRITE = new EventType("WRITE");
214
215         public static final EventType CLOSE = new EventType("CLOSE");
216
217         private final String JavaDoc value;
218
219         private EventType(String JavaDoc value) {
220             this.value = value;
221         }
222
223         public String JavaDoc toString() {
224             return value;
225         }
226     }
227
228     private static class Event {
229         private final EventType type;
230
231         private final Object JavaDoc data;
232
233         public Event(EventType type, Object JavaDoc data) {
234             this.type = type;
235             this.data = data;
236         }
237
238         public Object JavaDoc getData() {
239             return data;
240         }
241
242         public EventType getType() {
243             return type;
244         }
245     }
246 }
247
Popular Tags