KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > xsocket > Dispatcher


1 /// $Id: Dispatcher.java 1568 2007-07-26 06:47:32Z grro $
2
/*
3  * Copyright (c) xsocket.org, 2006 - 2007. All rights reserved.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
20  * The latest copy of this software may be found on http://www.xsocket.org/
21  */

22 package org.xsocket;
23
24
25 import java.io.IOException JavaDoc;
26 import java.nio.channels.SelectionKey JavaDoc;
27 import java.nio.channels.Selector JavaDoc;
28 import java.util.HashSet JavaDoc;
29 import java.util.Iterator JavaDoc;
30 import java.util.Set JavaDoc;
31 import java.util.logging.Level JavaDoc;
32 import java.util.logging.Logger JavaDoc;
33
34
35
36 /**
37  * implementation of the {@link IDispatcher}
38  * <br><br>
39  * All dispatcher methods are thread save.
40  *
41  * @author grro@xsocket.org
42  */

43 public class Dispatcher<T extends IHandle> implements IDispatcher<T> {
44     
45     private static final Logger JavaDoc LOG = Logger.getLogger(Dispatcher.class.getName());
46     
47     private static final long TIMEOUT_SHUTDOWN_MILLIS = 5L * 1000L;
48     
49     // is open flag
50
private boolean isOpen = true;
51
52     // guard object for synchronizing
53
private Object JavaDoc dispatcherThreadGuard = new Object JavaDoc();
54
55     // connection handling
56
private Selector JavaDoc selector = null;
57     
58     // event handler
59
private IEventHandler<T> eventHandler = null;
60     
61     
62     
63     // statistics
64
private long handledRegistractions = 0;
65     private long handledReads = 0;
66     private long handledWrites = 0;
67     
68     /**
69      * constructor
70      *
71      * @param eventHandler the assigned event handler
72      */

73     public Dispatcher(IEventHandler<T> eventHandler) {
74         assert (eventHandler != null) : "null is not allowed for event handler ";
75         
76         this.eventHandler = eventHandler;
77         
78
79         if (LOG.isLoggable(Level.FINE)) {
80             LOG.fine("dispatcher " + this.hashCode() + " has been created (eventHandler=" + eventHandler + ")");
81         }
82         
83         try {
84             selector = Selector.open();
85         } catch (IOException JavaDoc ioe) {
86             String JavaDoc text = "exception occured while opening selector. Reason: " + ioe.toString();
87             LOG.severe(text);
88             throw new RuntimeException JavaDoc(text, ioe);
89         }
90     }
91         
92
93     /**
94      * {@inheritDoc}
95      */

96     public final IEventHandler<T> getEventHandler() {
97         return eventHandler;
98     }
99     
100
101     /**
102      * {@inheritDoc}
103      */

104     public void register(T handle, int ops) throws IOException JavaDoc {
105         assert (!handle.getChannel().isBlocking());
106         
107         if (LOG.isLoggable(Level.FINE)) {
108             LOG.fine("register handle " + handle);
109         }
110                 
111         synchronized (dispatcherThreadGuard) {
112             selector.wakeup();
113             
114             handle.getChannel().register(selector, ops, handle);
115             eventHandler.onHandleRegisterEvent(handle);
116         }
117         
118         
119         handledRegistractions++;
120     }
121
122     
123     /**
124      * {@inheritDoc}
125      */

126     public void deregister(final T handle) throws IOException JavaDoc {
127
128         synchronized (dispatcherThreadGuard) {
129             selector.wakeup();
130             
131             SelectionKey JavaDoc key = handle.getChannel().keyFor(selector);
132             if (key.isValid()) {
133                 key.cancel();
134             }
135         }
136     }
137
138         
139     /**
140      * {@inheritDoc}
141      */

142     @SuppressWarnings JavaDoc("unchecked")
143     public final Set JavaDoc<T> getRegistered() {
144
145         Set JavaDoc<T> registered = new HashSet JavaDoc<T>();
146
147         if (selector != null) {
148             SelectionKey JavaDoc[] selKeys = null;
149             synchronized (dispatcherThreadGuard) {
150                 selector.wakeup();
151
152                 Set JavaDoc<SelectionKey JavaDoc> keySet = selector.keys();
153                 selKeys = keySet.toArray(new SelectionKey JavaDoc[keySet.size()]);
154             }
155
156             try {
157                 for (SelectionKey JavaDoc key : selKeys) {
158                     T handle = (T) key.attachment();
159                     registered.add(handle);
160                 }
161             } catch (Exception JavaDoc ignore) { }
162         }
163         
164         return registered;
165     }
166     
167     
168     
169     /**
170      * {@inheritDoc}
171      */

172     public final void updateInterestSet(T handle, int ops) throws IOException JavaDoc {
173         SelectionKey JavaDoc key = handle.getChannel().keyFor(selector);
174         
175         if (key != null) {
176             synchronized (dispatcherThreadGuard) {
177                 if (key.isValid()) {
178                     key.selector().wakeup();
179                     key.interestOps(ops);
180                     
181                     if (LOG.isLoggable(Level.FINER)) {
182                         LOG.finer("interest ops for " + handle + " updated to " + ops);
183                     }
184                 } else {
185                     throw new IOException JavaDoc("handle " + handle + " is invalid ");
186                 }
187             }
188         }
189     }
190
191
192     
193     /**
194      * {@inheritDoc}
195      */

196     @SuppressWarnings JavaDoc("unchecked")
197     public final void run() {
198         
199         if (LOG.isLoggable(Level.FINE)) {
200             LOG.fine("selector listening ...");
201         }
202         
203         
204         while(isOpen) {
205             try {
206                 
207                 // see http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf
208
synchronized (dispatcherThreadGuard) {
209                     /* suspend the dispatcher thead */
210                 }
211                 
212                 // waiting for new events (data, ...)
213
int eventCount = selector.select(1000);
214
215                 // handle read write events
216
if (eventCount > 0) {
217                     Set JavaDoc selectedEventKeys = selector.selectedKeys();
218                     Iterator JavaDoc it = selectedEventKeys.iterator();
219
220                     // handle read & write
221
while (it.hasNext()) {
222                         SelectionKey JavaDoc eventKey = (SelectionKey JavaDoc) it.next();
223                         it.remove();
224
225                         T handle = (T) eventKey.attachment();
226                                 
227                         // read data
228
if (eventKey.isValid() && eventKey.isReadable()) {
229
230                             // notify event handler
231
eventHandler.onHandleReadableEvent(handle);
232                             handledReads++;
233                         }
234
235                         // write data
236
if (eventKey.isValid() && eventKey.isWritable()) {
237                             handledWrites++;
238
239                             // notify event handler
240
eventHandler.onHandleWriteableEvent(handle);
241                         }
242                     }
243                 }
244
245             } catch (Throwable JavaDoc e) {
246                 LOG.warning("exception occured while processing. Reason " + e.toString());
247             }
248         }
249
250                 
251         closeDispatcher();
252     }
253
254
255     @SuppressWarnings JavaDoc("unchecked")
256     private void closeDispatcher() {
257         LOG.fine("closing connections");
258             
259       
260             
261             
262         if (selector != null) {
263             try {
264                 selector.close();
265             } catch (Exception JavaDoc e) {
266                 if (LOG.isLoggable(Level.FINE)) {
267                     LOG.fine("error occured by close selector within tearDown " + e.toString());
268                 }
269             }
270         }
271     }
272
273     
274     
275     /**
276      * {@inheritDoc}
277      */

278     public void close() {
279         if (isOpen) {
280             if (selector != null) {
281                 
282                 // initiate closing of open connections
283
Set JavaDoc<T> openHandles = getRegistered();
284                 final int openConnections = openHandles.size();
285                 for (T handle : openHandles) {
286                     eventHandler.onDispatcherCloseEvent(handle);
287                 }
288                 
289                 // start closer thread
290
Thread JavaDoc closer = new Thread JavaDoc() {
291                     @Override JavaDoc
292                     public void run() {
293                         long start = System.currentTimeMillis();
294                         
295                         int terminatedConnections = 0;
296                         do {
297                             try {
298                                 Thread.sleep(100);
299                             } catch (InterruptedException JavaDoc ignore) { }
300                             
301                             if (System.currentTimeMillis() > (start + TIMEOUT_SHUTDOWN_MILLIS)) {
302                                 LOG.warning("shutdown timeout reached (" + DataConverter.toFormatedDuration(TIMEOUT_SHUTDOWN_MILLIS) + "). kill pending connections");
303                                 for (SelectionKey JavaDoc sk : selector.keys()) {
304                                     try {
305                                         terminatedConnections++;
306                                         sk.channel().close();
307                                     } catch (Exception JavaDoc ignore) { }
308                                 }
309                                 
310                                 break;
311                             }
312                         } while (getRegistered().size() > 0);
313     
314                         isOpen = false;
315                         // wake up selector, so that isRunning-loop can be terminated
316
selector.wakeup();
317                         
318                         if ((openConnections > 0) || (terminatedConnections > 0)) {
319                             if ((openConnections > 0) && (terminatedConnections > 0)) {
320                                 LOG.info((openConnections - terminatedConnections) + " connections has been closed properly, "
321                                           + terminatedConnections + " connections has been terminate unclean");
322                             }
323                         }
324                     
325                         
326                         if (LOG.isLoggable(Level.FINE)) {
327                             LOG.fine("dispatcher " + this.hashCode() + " has been closed (shutdown time = " + DataConverter.toFormatedDuration(System.currentTimeMillis() - start) + ")");
328                         }
329                     }
330                 };
331                 closer.setName("xDispatcherCloser");
332                 closer.start();
333             }
334         }
335     }
336
337     
338     /**
339      * check if this dispatcher is open
340      * @return true, if the disptacher is open
341      */

342     public final boolean isOpen() {
343         return isOpen;
344     }
345     
346     
347
348
349
350     /**
351      * {@inheritDoc}
352      */

353     public final long getNumberOfHandledRegistrations() {
354         return handledRegistractions;
355     }
356     
357     
358     /**
359      * {@inheritDoc}
360      */

361     public final long getNumberOfHandledReads() {
362         return handledReads;
363     }
364     
365     
366     /**
367      * {@inheritDoc}
368      */

369     public final long getNumberOfHandledWrites() {
370         return handledWrites;
371     }
372 }
373
Popular Tags