KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > xsocket > stream > io > impl > IoMultithreadedHandler


1 // $Id: IoDelayWriteHandler.java 1316 2007-06-10 08:51:18Z grro $
2
/*
3  * Copyright (c) xsocket.org, 2006 - 2007. All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
20  * The latest copy of this software may be found on http://www.xsocket.org/
21  */

22 package org.xsocket.stream.io.impl;
23
24 import java.io.IOException JavaDoc;
25 import java.nio.ByteBuffer JavaDoc;
26 import java.util.LinkedList JavaDoc;
27 import java.util.Queue JavaDoc;
28 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
29 import java.util.logging.Level JavaDoc;
30 import java.util.logging.Logger JavaDoc;
31
32 import org.xsocket.stream.io.spi.IIoHandlerCallback;
33 import org.xsocket.stream.io.spi.IIoHandlerContext;
34
35
36
37
38 /**
39  * Delayed write IO handler
40  *
41  * @author grro@xsocket.org
42  */

43 final class IoMultithreadedHandler extends ChainableIoHandler {
44
45     private static final Logger JavaDoc LOG = Logger.getLogger(IoMultithreadedHandler.class.getName());
46     
47     
48     private String JavaDoc id = "<null>";
49     private IIoHandlerContext ctx = null;
50     private final TaskQueue taskQueue = new TaskQueue();
51
52     private final IOEventHandler eventHandler = new IOEventHandler();
53     
54     
55     /**
56      * constructor
57      * @param successor the successor
58      */

59     IoMultithreadedHandler(ChainableIoHandler successor, IIoHandlerContext ctx) {
60         super(successor);
61         this.ctx = ctx;
62         
63         setSuccessor(successor);
64     }
65     
66     public void init(IIoHandlerCallback callbackHandler) throws IOException JavaDoc {
67         setPreviousCallback(callbackHandler);
68         getSuccessor().init(eventHandler);
69     }
70
71     
72
73
74
75
76     /**
77      * {@inheritDoc}
78      */

79     public LinkedList JavaDoc<ByteBuffer JavaDoc> drainIncoming() {
80         return getSuccessor().drainIncoming();
81     }
82     
83     
84     /**
85      * {@inheritDoc}
86      */

87     public void close(boolean immediate) throws IOException JavaDoc {
88         if (!immediate) {
89             flushOutgoing();
90         }
91
92         getSuccessor().close(immediate);
93     }
94     
95
96     /**
97      * {@inheritDoc}
98      */

99     public void writeOutgoing(ByteBuffer JavaDoc buffer) throws IOException JavaDoc {
100         getSuccessor().writeOutgoing(buffer);
101     }
102     
103     
104     /**
105      * {@inheritDoc}
106      */

107     public void writeOutgoing(LinkedList JavaDoc<ByteBuffer JavaDoc> buffers) throws IOException JavaDoc {
108         getSuccessor().writeOutgoing(buffers);
109     }
110     
111
112     
113     /**
114      * {@inheritDoc}
115      */

116     public void flushOutgoing() throws IOException JavaDoc {
117         getSuccessor().flushOutgoing();
118     }
119
120     
121     private final class IOEventHandler implements IIoHandlerCallback {
122         
123         public void onWriteException(IOException JavaDoc ioException) {
124             getPreviousCallback().onWriteException(ioException);
125         }
126
127         public void onWritten() {
128             getPreviousCallback().onWritten();
129         }
130
131                 
132         public void onConnectionAbnormalTerminated() {
133             getPreviousCallback().onConnectionAbnormalTerminated();
134         }
135
136         
137         public void onConnect() {
138             if (ctx.isAppHandlerListenForConnectEvent()) {
139                 Runnable JavaDoc task = new Runnable JavaDoc() {
140                     public void run() {
141                         try {
142                             getPreviousCallback().onConnect();
143                         } catch (Exception JavaDoc e) {
144                             if (LOG.isLoggable(Level.FINE)) {
145                                 LOG.fine("[" + id + "] error occured by handling connect. Reason: " + e.toString());
146                             }
147                         }
148                     }
149                 };
150                     
151                 taskQueue.processTask(task);
152             }
153         }
154
155         
156         public void onDataRead() {
157             if (ctx.isAppHandlerListenForDataEvent()) {
158                 Runnable JavaDoc task = new Runnable JavaDoc() {
159                     public void run() {
160                         try {
161                             getPreviousCallback().onDataRead();
162                         } catch (Exception JavaDoc e) {
163                             if (LOG.isLoggable(Level.FINE)) {
164                                 LOG.fine("[" + id + "] error occured by handling data. Reason: " + e.toString());
165                             }
166                         }
167                     }
168                 };
169                 
170                 taskQueue.processTask(task);
171             }
172         }
173
174
175         
176         public void onDisconnect() {
177             if (ctx.isAppHandlerListenforDisconnectEvent()) {
178                 Runnable JavaDoc task = new Runnable JavaDoc() {
179                     public void run() {
180                         try {
181                             getPreviousCallback().onDisconnect();
182                         } catch (Exception JavaDoc e) {
183                             if (LOG.isLoggable(Level.FINE)) {
184                                 LOG.fine("[" + id + "] error occured by handling connect. Reason: " + e.toString());
185                             }
186                         }
187                     }
188                 };
189                 
190                 taskQueue.processTask(task);
191             }
192
193         }
194
195         
196         public void onConnectionTimeout() {
197             Runnable JavaDoc task = new Runnable JavaDoc() {
198                 public void run() {
199                     try {
200                         getPreviousCallback().onConnectionTimeout();
201                     } catch (Exception JavaDoc e) {
202                         if (LOG.isLoggable(Level.FINE)) {
203                             LOG.fine("[" + id + "] error occured by handling onConnectionTimeout. Reason: " + e.toString());
204                         }
205                     }
206                 }
207             };
208             
209             taskQueue.processTask(task);
210         }
211
212         
213         public void onIdleTimeout() {
214             Runnable JavaDoc task = new Runnable JavaDoc() {
215                 public void run() {
216                     try {
217                         getPreviousCallback().onIdleTimeout();
218                     } catch (Exception JavaDoc e) {
219                         if (LOG.isLoggable(Level.FINE)) {
220                             LOG.fine("[" + id + "] error occured by handling onIdleTimeout. Reason: " + e.toString());
221                         }
222                     }
223                 }
224             };
225                 
226             taskQueue.processTask(task);
227         }
228     }
229     
230     
231     
232     
233     private final class TaskQueue {
234         
235         private final TaskQueueProcessor taskProcessor = new TaskQueueProcessor();
236         
237         private final Queue JavaDoc<Runnable JavaDoc> tasks = new ConcurrentLinkedQueue JavaDoc<Runnable JavaDoc>();
238         
239         public void processTask(Runnable JavaDoc task) {
240             
241             // workpool available (task will performed be handled within worker thread)? -> put task into fifo queue and process it to ensure the right order
242
if (ctx.getWorkerpool() != null) {
243                 
244                 // add task to task queue
245
tasks.offer(task);
246         
247                 // process the task
248
ctx.getWorkerpool().execute(taskProcessor);
249                 
250                 
251             // no workerpool (no multithreading, task will be performed within acceptor/disptacher thread)
252
} else {
253                 task.run();
254             }
255         }
256     }
257     
258
259     private final class TaskQueueProcessor implements Runnable JavaDoc {
260         
261         public void run() {
262             if (!ctx.isAppHandlerThreadSafe()) {
263                 synchronized (IoMultithreadedHandler.this) {
264                     Runnable JavaDoc task = taskQueue.tasks.poll();
265                     processTask(task);
266                 }
267             } else {
268                 Runnable JavaDoc task = null;
269                 task = taskQueue.tasks.poll();
270                 processTask(task);
271             }
272         }
273
274         private void processTask(Runnable JavaDoc task) {
275             if (task != null) {
276                 try {
277                     task.run();
278                 } catch (Exception JavaDoc e) {
279                     if (LOG.isLoggable(Level.FINE)) {
280                         LOG.fine("error occured by proccesing task " + task);
281                     }
282                 }
283             }
284         }
285     }
286     
287 }
288
Popular Tags