KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > sapia > ubik > net > nio > acceptor > Acceptor


1 package org.sapia.ubik.net.nio.acceptor;
2
3 import java.io.IOException JavaDoc;
4 import java.nio.channels.Channel JavaDoc;
5 import java.nio.channels.ClosedByInterruptException JavaDoc;
6 import java.nio.channels.SelectionKey JavaDoc;
7 import java.nio.channels.Selector JavaDoc;
8 import java.util.Iterator JavaDoc;
9
10 import org.sapia.ubik.net.NestedIOException;
11 import org.sapia.ubik.net.nio.ChannelManager;
12 import org.sapia.ubik.net.nio.Cycle;
13 import org.sapia.ubik.net.nio.CycleListener;
14 import org.sapia.ubik.net.nio.Dispatcher;
15 import org.sapia.ubik.net.nio.dispatcher.DefaultDispatcher;
16 import org.sapia.ubik.util.Debug;
17 import org.sapia.ubik.util.StartStopLock;
18
19 /**
20  * @author Yanick Duchesne
21  *
22  * <dl>
23  * <dt><b>Copyright: </b>
24  * <dd>Copyright &#169; 2002-2005 <a HREF="http://www.sapia-oss.org">Sapia Open
25  * Source Software </a>. All Rights Reserved.</dd>
26  * </dt>
27  * <dt><b>License: </b>
28  * <dd>Read the license.txt file of the jar or visit the <a
29  * HREF="http://www.sapia-oss.org/license.html">license page </a> at the Sapia
30  * OSS web site</dd>
31  * </dt>
32  * </dl>
33  */

34 public class Acceptor {
35
36   private String JavaDoc _threadNamePrefix = "sapia.ubik.nio.acceptor";
37   private boolean _started, _daemon;
38   private SelectorThread _selectorThread;
39   private ServerThread _serverThread;
40   private AcceptorConfig _config;
41   private CycleListener _listener;
42
43   public Acceptor(ChannelManager manager) {
44     _config = new AcceptorConfig(manager, new DefaultDispatcher());
45   }
46
47   public Acceptor(ChannelManager manager, Dispatcher dispatcher) {
48     _config = new AcceptorConfig(manager, dispatcher);
49   }
50   
51   /**
52    * @param bufSize the size of the <code>ByteBuffer</code>s that are internally
53    * used.
54    */

55   public void setBufferSize(int bufSize){
56     _config.pool.setBufSize(bufSize);
57   }
58
59   /**
60    * @param daemon
61    * <code>true</code> if this acceptor is to run as a daemon.
62    */

63   public void setDaemon(boolean daemon) {
64     _daemon = daemon;
65   }
66
67   /**
68    * @param prefix
69    * the prefix used for names of threads that are spawned by this
70    * acceptor..
71    */

72   public void setThreadNamePrefix(String JavaDoc prefix) {
73     if(prefix != null)
74       _threadNamePrefix = prefix;
75   }
76
77   /**
78    * @param debug
79    * the <code>Debug</code> instance that this acceptor will use.
80    */

81   public void setDebug(Debug debug) {
82     _config.debug = debug;
83   }
84   
85   /**
86    * @return the address of the server channel held by this instance.
87    *
88    * @see ChannelManager#getAddress(Channel)
89    */

90   public Object JavaDoc getAddress(){
91     if(_config.server == null){
92       throw new IllegalStateException JavaDoc("Server not started");
93     }
94     
95     int maxAttempts = 3;
96     int attempt = 0;
97     while(attempt<maxAttempts){
98       Object JavaDoc address = _config.manager.getAddress(_config.server);
99       if(address != null){
100         return address;
101       }
102       try{
103         Thread.sleep(2000);
104       }catch(InterruptedException JavaDoc e){
105         throw new IllegalStateException JavaDoc("Thread interrupted");
106       }
107       attempt++;
108     }
109     throw new IllegalStateException JavaDoc("Could not acquire server address; server startup might have failed");
110   }
111   
112   /**
113    * @return the server <code>Channel</code> that this instance uses.
114    */

115   public Channel JavaDoc getChannel(){
116     if(_config.server == null){
117       throw new IllegalStateException JavaDoc("Server not started");
118     }
119
120     return _config.server;
121   }
122
123   /**
124    * Starts this instance.
125    *
126    * @throws IOException
127    * if an IO problem occurs.
128    */

129   public synchronized void start() throws IOException JavaDoc {
130
131     Selector JavaDoc selector = Selector.open();
132     _config.debug.out(getClass(), "Creating server channel");
133     Channel JavaDoc channel = _config.manager.create();
134
135     _serverThread = new ServerThread();
136     _serverThread.setDaemon(_daemon);
137     _serverThread.setName(_threadNamePrefix + ".server");
138     _serverThread._parent = this;
139
140     _selectorThread = new SelectorThread();
141     _selectorThread.setDaemon(_daemon);
142     _selectorThread.setName(_threadNamePrefix + ".selector");
143     _selectorThread._parent = this;
144
145     StateHandler[] handlers = new StateHandler[6];
146
147     handlers[Cycle.STATE_READ] = new ReadStateHandler(_config);
148     handlers[Cycle.STATE_WRITE] = new WriteStateHandler(_config);
149     handlers[Cycle.STATE_PROCESS] = new ProcessStateHandler(_config);
150     handlers[Cycle.STATE_RECYCLE] = new RecycleStateHandler(_config);
151     handlers[Cycle.STATE_COMPLETE] = new CompleteStateHandler(_config);
152     handlers[Cycle.STATE_ERROR] = new ErrorStateHandler(_config);
153
154     _config.init(channel, selector, handlers);
155
156     _listener = new AcceptorCycleListener(_config);
157
158     _serverThread.start();
159     try {
160       _serverThread._lock.waitStarted();
161     } catch(InterruptedException JavaDoc e) {
162       throw new IOException JavaDoc("Server thread interrupted at startup");
163     } catch(IOException JavaDoc e) {
164       throw e;
165     } catch(Throwable JavaDoc err) {
166       throw new NestedIOException("Error caught while starting server", err);
167     }
168
169     _selectorThread.start();
170     try {
171       _selectorThread._lock.waitStarted();
172     } catch(InterruptedException JavaDoc e) {
173       throw new IOException JavaDoc("Selector thread interrupted at startup");
174     } catch(IOException JavaDoc e) {
175       throw e;
176     } catch(Throwable JavaDoc err) {
177       throw new NestedIOException("Error caught while starting selector", err);
178     }
179
180     _config.debug.out(getClass(), "Server started");
181   }
182
183   /**
184    * Stops this instance.
185    */

186   public synchronized void stop() {
187     _config.debug.out(getClass(), "Stopping server...");
188     try {
189       _serverThread._lock.triggerStop();
190       _serverThread._lock.waitStopped();
191     } catch(Throwable JavaDoc t) {
192     }
193     try {
194       _selectorThread._lock.triggerStop();
195       _selectorThread._lock.waitStopped();
196     } catch(Throwable JavaDoc t) {
197     }
198     _config.debug.out(getClass(), "Server stopped");
199   }
200
201   ////////////////////////////// INNER CLASSES ///////////////////////////////
202

203   static final class ServerThread extends Thread JavaDoc implements
204       StartStopLock.StopRequestListener {
205
206     Acceptor _parent;
207     StartStopLock _lock = new StartStopLock(this);
208
209     public void run() {
210       Debug debug = _parent._config.debug;
211       ChannelManager manager = _parent._config.manager;
212       Channel JavaDoc server = _parent._config.server;
213       Channel JavaDoc client;
214       while(!_lock.stopRequested && !interrupted()) {
215         try {
216           _lock.notifyStarted(null);
217           client = manager.accept(server);
218           if(client != null) {
219             AcceptorCycle cycle = new AcceptorCycle(manager, client,
220                 _parent._config.pool, _parent._listener);
221             _parent._config.queue.add(cycle, false);
222           }
223         } catch(ClosedByInterruptException JavaDoc e) {
224           break;
225         } catch(IOException JavaDoc e) {
226           debug.out(getClass(), "IOException caught", e);
227           break;
228         } catch(Exception JavaDoc e) {
229           debug.out(getClass(), "Exception caught", e);
230         }
231       }
232       if(_lock.stopRequested) {
233         try {
234           server.close();
235           _parent._config.dispatcher.close();
236         } catch(IOException JavaDoc e2) {
237         }
238         _lock.notifyStopped(null);
239       }
240     }
241
242     /**
243      * @see org.sapia.ubik.util.StartStopLock.StopRequestListener#onStopRequested()
244      */

245     public void onStopRequested() throws Throwable JavaDoc {
246       interrupt();
247     }
248
249   }
250
251   static final class SelectorThread extends Thread JavaDoc implements
252       StartStopLock.StopRequestListener {
253
254     Acceptor _parent;
255     Exception JavaDoc _err;
256     StartStopLock _lock = new StartStopLock(this);
257
258     public void run() {
259       while(true){
260         try{
261           doRun();
262         }catch(RuntimeException JavaDoc e){
263         }
264         if(interrupted() || _lock.stopRequested){
265           break;
266         }
267       }
268     }
269       
270     private void doRun(){
271       Debug debug = _parent._config.debug;
272       int selected, state;
273       SelectionKey JavaDoc key = null;
274       AcceptorCycle cycle = null;
275
276       while(!_lock.stopRequested && !isInterrupted()) {
277         selected = 0;
278         try {
279           _lock.notifyStarted(null);
280           selected = _parent._config.selector.select();
281         } catch(IOException JavaDoc e) {
282           debug.out(getClass(), "Exception caught while selecting", e);
283           doStop();
284           break;
285         }
286
287         if(_parent._config.queue.wasItemAdded()) {
288           try {
289             _parent._config.queue.register();
290           } catch(IOException JavaDoc e) {
291             debug.out(getClass(), "Could not register pending clients", e);
292           }
293         }
294
295         if(selected > 0) {
296           Iterator JavaDoc itr = _parent._config.selector.selectedKeys().iterator();
297           while(itr.hasNext()) {
298             key = (SelectionKey JavaDoc) itr.next();
299             cycle = (AcceptorCycle) key.attachment();
300             if(cycle == null) {
301               handleError(cycle, new IllegalStateException JavaDoc(
302                   "Cycle instance not attached to key"));
303               continue;
304             }
305             key.cancel();
306             try {
307               _parent._config.handlers[cycle.state()].handle(cycle);
308             } catch(HandlerException e) {
309               debug.out(getClass(), e.getMessage(), e);
310               handleError((AcceptorCycle) key.attachment(), e);
311             }
312           }
313         }
314       }
315       _lock.notifyStopped(null);
316     }
317
318     /**
319      * @see org.sapia.ubik.util.StartStopLock.StopRequestListener#onStopRequested()
320      */

321     public void onStopRequested() throws Throwable JavaDoc {
322       interrupt();
323       _parent._config.selector.wakeup();
324     }
325
326     private synchronized void doStop() {
327       if(_parent._config.selector != null) {
328         Iterator JavaDoc keys = _parent._config.selector.keys().iterator();
329         SelectionKey JavaDoc key;
330         while(keys.hasNext()) {
331           key = (SelectionKey JavaDoc) keys.next();
332           Object JavaDoc attached = key.attachment();
333           if(attached instanceof AcceptorCycle) {
334             Cycle cycle = (Cycle) key.attachment();
335             cycle.destroy();
336           }
337         }
338         try {
339           _parent._config.selector.close();
340         } catch(IOException JavaDoc e) {
341         }
342       }
343     }
344
345     private void handleError(Cycle cycle, Throwable JavaDoc t) {
346       if(cycle != null) {
347         cycle.destroy();
348       }
349     }
350   }
351
352 }
353
Popular Tags