KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > freecs > core > CentralSelector


1 /**
2  * Copyright (C) 2003 Manfred Andres
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17  */

18
19 /**
20  * The CentralSelector does the actual IO. All connections get Registered
21  * and will be served when they are ready for action.
22  *
23  * When a connection comes in, it get's registered with the CentralSelector.
24  * If a connectino has content, the content will be automatically directed to
25  * one RequestReader's io-queue
26  * If content must be written, the ConnectinoBuffer's write-queue attached to
27  * the connection will store it, untill the connection is ready for write-action
28  */

29 package freecs.core;
30
31 import freecs.Server;
32 import freecs.util.ObjectBuffer;
33 import java.util.Set JavaDoc;
34 import java.util.Iterator JavaDoc;
35 import java.io.IOException JavaDoc;
36 import java.net.Socket JavaDoc;
37 import java.nio.channels.Selector JavaDoc;
38 import java.nio.channels.SelectionKey JavaDoc;
39 import java.nio.channels.SocketChannel JavaDoc;
40 import java.nio.channels.spi.SelectorProvider JavaDoc;
41 import java.nio.channels.ClosedChannelException JavaDoc;
42 import java.nio.channels.CancelledKeyException JavaDoc;
43
44 public class CentralSelector extends Thread JavaDoc {
45    public static boolean stopped = false;
46    public static final CentralSelector cSel = new CentralSelector();
47    private Selector JavaDoc sel = null;
48    private long rqLastChecked, nextUnavailableMessage=0;
49    public ObjectBuffer dropKeys;
50    
51    public ObjectBuffer reqQueue = new ObjectBuffer (Server.srv.MAX_READERS*10);
52
53    private CentralSelector () {
54       dropKeys = new ObjectBuffer (10000);
55       if (!initCsel ())
56          Server.log (this, "construct: unable to init Csel", Server.MSG_ERROR, Server.LVL_HALT);
57    }
58
59    private boolean initCsel () {
60         if (sel == null || !sel.isOpen ()) try {
61             sel = SelectorProvider.provider ().openSelector ();
62         } catch (IOException JavaDoc ioe) {
63             Server.debug (this, "initCsel:", ioe, Server.MSG_ERROR, Server.LVL_HALT);
64             return false;
65         }
66         if (sel != null && sel.isOpen ())
67             return true;
68         return false;
69    }
70
71     public static void startCentralSelector () {
72         cSel.setName("CentralSelector");
73         if (!cSel.isAlive())
74             cSel.start();
75         // cSel.setPriority(MAX_PRIORITY-2);
76
}
77
78    public int keyCount () {
79       Set JavaDoc keys = sel.keys ();
80       return keys.size ();
81    }
82    
83     public void registerSC (SocketChannel JavaDoc sc, int reqType) throws IOException JavaDoc, ClosedChannelException JavaDoc {
84         if (sc == null) return;
85         sc.configureBlocking (false);
86         ConnectionBuffer cb = new ConnectionBuffer (reqType);
87         cb.setKey (sc.register (sel, SelectionKey.OP_READ, cb));
88     }
89
90     public void run () {
91         Server.log (this, "starting up", Server.MSG_STATE, Server.LVL_MINOR);
92         int sdc = 500;
93         long lastMessage = 0;
94         Thread JavaDoc katc = new Thread JavaDoc(new KeepAliveTimeoutChecker());
95         katc.start();
96         while (Server.srv.isRunning () || sel.keys().size() > 0) try {
97             if (!Server.srv.isRunning ()) {
98                 sdc--;
99                 if (sdc <= 0) break;
100             }
101             if (Server.DEBUG || lastMessage + 5000 > System.currentTimeMillis()) {
102                 Server.log (this, "loopstart: known sockets=" + sel.keys().size(), Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
103                 lastMessage = System.currentTimeMillis();
104             }
105             while (!dropKeys.isEmpty()) {
106                 SelectionKey JavaDoc sc;
107                 synchronized (dropKeys) {
108                     sc = (SelectionKey JavaDoc) dropKeys.pop();
109                     dropKeys.notify();
110                 }
111                 implCloseChannel (sc);
112             }
113             long now = System.currentTimeMillis();
114             try {
115                 if (sel.selectNow() < 1) {
116                     try {
117                         Thread.sleep (33);
118                     } catch (InterruptedException JavaDoc ie) { }
119                     continue;
120                 }
121             } catch (Exception JavaDoc e) {
122                 Server.debug (this, "run (select):", e, Server.MSG_ERROR , Server.LVL_MAJOR);
123             }
124             Set JavaDoc keys = sel.selectedKeys();
125             if (keys!=null && !keys.isEmpty()) {
126                 for (Iterator JavaDoc i = keys.iterator (); i.hasNext (); ) {
127                     SelectionKey JavaDoc ck = (SelectionKey JavaDoc) i.next ();
128                     i.remove();
129                     try {
130                         if (!CentralSelector.isSkValid(ck)) {
131                             Server.log (this, "run: current key is invalid", Server.MSG_STATE, Server.LVL_VERBOSE);
132                             continue;
133                         }
134                         if (ck.isReadable ()) {
135                             readIn (ck);
136                         }
137                     } catch (CancelledKeyException JavaDoc cke) { }
138                 }
139             }
140             try {
141                 Thread.sleep (33);
142             } catch (InterruptedException JavaDoc ie) { }
143         } catch (Exception JavaDoc e) {
144             Server.debug (this, "(outer loop): ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
145         }
146         katc.interrupt();
147         if (sel != null) try {
148             Server.log (this, "closing down", Server.MSG_ERROR, Server.LVL_MAJOR);
149             sel.close ();
150         } catch (Exception JavaDoc e) {
151             Server.debug (this, "shutting down: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
152         }
153         Server.log (this, "suspended", Server.MSG_ERROR, Server.LVL_MINOR);
154         stopped = true;
155         // cSelList.remove (this);
156
// Server.log (cSelList.size () + " CentralSelectors in cSelList", Server.MSG_STATE, Server.LVL_MINOR);
157
}
158
159     private void readIn (SelectionKey JavaDoc sk) {
160         if (!CentralSelector.isSkValid(sk)) {
161             Server.log (this, "readIn: current request has invalid key", Server.MSG_STATE, Server.LVL_VERBOSE);
162             return;
163         }
164         ConnectionBuffer cb = (ConnectionBuffer) sk.attachment ();
165         int bytesRead;
166         try {
167             synchronized (cb) {
168                 SocketChannel JavaDoc sc = (SocketChannel JavaDoc) sk.channel ();
169                 bytesRead = sc.read (cb.rBuf);
170                 if (bytesRead < 0) {
171                     // Server.log ("CentralSelector.readIn: droped key", Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
172
implCloseChannel (sk);
173                     return;
174                 } else if (bytesRead == 0) {
175                     Server.log (this, "readIn: no data from socket", Server.MSG_STATE, Server.LVL_VERBOSE);
176                     return;
177                 }
178                 cb.updateKeepAliveTimeout();
179                 cb.currentRequest = cb.append();
180                 if (cb.currentRequest != null) {
181                     addRequest(sk, cb);
182                 }
183                 return;
184             }
185         } catch (IOException JavaDoc ioe) {
186             Server.debug (this, "readIn: droped key (IOException)", ioe, Server.MSG_ERROR, Server.LVL_VERY_VERBOSE);
187             implCloseChannel (sk);
188             cb.logError (ioe.getMessage());
189         } catch (Exception JavaDoc e) {
190             Server.debug (this, "readIn: Exception encountered while reading: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
191             implCloseChannel (sk);
192             cb.logError (e.getMessage());
193         }
194     }
195     
196     public void addRequest(SelectionKey JavaDoc sk, ConnectionBuffer cb) {
197         if (Server.srv.USE_CENTRAL_REQUESTQUEUE
198                 && !this.addRequestToQueue (sk)) {
199             implCloseChannel (sk);
200             if (nextUnavailableMessage >= System.currentTimeMillis())
201                 return;
202             cb.logError ("RequestQueue is full");
203             Server.log (this, "readIn: RequestQueue is full", Server.MSG_ERROR, Server.LVL_MAJOR);
204             nextUnavailableMessage += 1000;
205         } else if (!Server.srv.USE_CENTRAL_REQUESTQUEUE
206                 && !RequestReader.processRequest(sk)) {
207             implCloseChannel (sk);
208             if (nextUnavailableMessage >= System.currentTimeMillis())
209                 return;
210             cb.logError ("No available requestreader");
211             Server.log (this, "readIn: No availabel requestreader to process request", Server.MSG_ERROR, Server.LVL_MAJOR);
212             nextUnavailableMessage += 1000;
213         }
214     }
215
216     private void implCloseChannel (SelectionKey JavaDoc sk) {
217         try {
218             ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
219             if (cb != null) {
220                 cb.invalidate();
221                 User u = cb.getUser();
222                 if (u!=null && sk.equals(u.getKey()) && !u.isRemoving() && !u.isLoggedOut()) {
223                     StringBuffer JavaDoc sb = new StringBuffer JavaDoc ("implCloseChannel: droped key for user ").append (u.getName ());
224                     Server.log ("static CentralSelector", sb.toString (), Server.MSG_STATE, Server.LVL_VERBOSE);
225                     u.scheduleToRemove();
226                 }
227             }
228             SocketChannel JavaDoc sc = (SocketChannel JavaDoc) sk.channel();
229             Responder.res.dropChannel(sc);
230             synchronized (sc) {
231                 Socket JavaDoc s = sc.socket();
232                 s.close();
233                 sc.close();
234             }
235             sk.cancel();
236         } catch (Exception JavaDoc e) {
237             Server.debug (this, "closeChannel: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
238             sk.cancel();
239         }
240     }
241
242     public static void dropKey (SelectionKey JavaDoc sk) {
243         if (sk == null) return;
244         ConnectionBuffer cb = (ConnectionBuffer) sk.attachment ();
245         if (cb != null) {
246             cb.invalidate();
247         }
248         addToDropKeys (sk);
249     }
250
251     public static void dropChannel (SocketChannel JavaDoc sc) {
252         SelectionKey JavaDoc sk = sc.keyFor(cSel.sel);
253         if (sk == null) {
254             try {
255                 sc.close();
256             } catch (IOException JavaDoc e) {
257                 Server.debug ("static CentralSelector", "dropChannle:", e, Server.MSG_ERROR, Server.LVL_MAJOR);
258             }
259             return;
260         }
261         ConnectionBuffer cb = (ConnectionBuffer) sk.attachment ();
262         if (cb != null) {
263             cb.invalidate();
264         }
265         addToDropKeys (sk);
266     }
267
268
269     private static void addToDropKeys (SelectionKey JavaDoc sk) {
270         long now = System.currentTimeMillis();
271         long stop = now + 5000;
272         synchronized (cSel.dropKeys) {
273             boolean success=cSel.dropKeys.put(sk);
274             while (!success && stop > now) {
275                 try {
276                     now = System.currentTimeMillis();
277                     long waitTime = stop - now;
278                     if (waitTime > 32)
279                         cSel.dropKeys.wait(stop - now);
280                 } catch (InterruptedException JavaDoc ie) { }
281                 success=cSel.dropKeys.put(sk);
282             }
283             if (!success)
284                 Server.log("static CentralSelector", "dropKey: unable to add dropkey", Server.MSG_ERROR, Server.LVL_MAJOR);
285             cSel.dropKeys.notify();
286         }
287     }
288
289     public static boolean isSkValid (SelectionKey JavaDoc sk) {
290         if (!chkSk(sk)) {
291             if (sk != null && cSel.equals(sk.selector()))
292                 dropKey (sk);
293             return false;
294         }
295         return true;
296     }
297     
298     private static boolean chkSk (SelectionKey JavaDoc sk) {
299         if (sk == null)
300             return false;
301         try {
302             ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
303             if (cb == null || !cb.isValid())
304                 return false;
305             if (!sk.isValid() || !sk.channel().isOpen()) {
306                 cb.invalidate();
307                 return false;
308             }
309             Socket JavaDoc s = ((SocketChannel JavaDoc) sk.channel()).socket();
310             if (s.isInputShutdown() || s.isOutputShutdown()) {
311                 cb.invalidate();
312                 return false;
313             }
314             if (cb != null) {
315                 if (!cb.isValid())
316                     return false;
317             }
318         } catch (Exception JavaDoc e) {
319             Server.debug ("static CentralSelector", "SelectionKey-Check:", e, Server.MSG_ERROR, Server.LVL_MAJOR);
320             return false;
321         }
322         return true;
323     }
324     
325     private boolean addRequestToQueue(SelectionKey JavaDoc sk) {
326         long stop = System.currentTimeMillis() + 1000;
327         boolean success=false;
328         try {
329             synchronized (this.reqQueue) {
330                 if (reqQueue.contains(sk))
331                     return true;
332                 success = reqQueue.put(sk);
333                 while (!success
334                         && stop > System.currentTimeMillis()) {
335                     this.reqQueue.wait(stop - System.currentTimeMillis());
336                     success = reqQueue.put(sk);
337                 }
338                 if (success)
339                     this.reqQueue.notify();
340             }
341         } catch (Exception JavaDoc e) {
342             Server.debug (this, "addRequestToQueue caused exception:", e, Server.MSG_ERROR, Server.LVL_MAJOR);
343         }
344         if (reqQueue.size() > ((this.reqQueue.capacity()/1.5)/RequestReader.activeReaders()))
345             RequestReader.startRequestReader(false);
346         return success;
347     }
348
349     public boolean equals (Object JavaDoc o) {
350         return o instanceof Selector JavaDoc && o.equals(sel);
351     }
352
353     public String JavaDoc toString() {
354         return "[CentralSelector]";
355     }
356     
357     private class KeepAliveTimeoutChecker implements Runnable JavaDoc {
358         private short loglvl = Server.LVL_VERY_VERBOSE;
359         KeepAliveTimeoutChecker() { }
360         public void run () {
361             long nextCheck = 0;
362             while (Server.srv.isRunning()) {
363                 long now = System.currentTimeMillis();
364                 if (nextCheck>now) {
365                     long diff = Math.max(nextCheck - now, 33);
366                     Server.log("KeepAliveCheck", "sleeping for " + diff + " millis", Server.MSG_STATE, loglvl);
367                     try {
368                         Thread.sleep(diff);
369                     } catch (InterruptedException JavaDoc ie) { /* ok */ }
370                     now = System.currentTimeMillis();
371                 }
372                 nextCheck = now + Server.srv.KEEP_ALIVE_TIMEOUT;
373                 SelectionKey JavaDoc[] checkArr;
374                 Server.log("KeepAliveCheck", "sync on selector", Server.MSG_STATE, loglvl);
375                 synchronized (CentralSelector.cSel.sel) {
376                     if (!CentralSelector.cSel.sel.isOpen()) {
377                         Server.log ("KeepAliveTimeoutChecker", "Selector closed. Shutting down KeepAliveTimeoutChecker", Server.MSG_STATE, Server.LVL_MINOR);
378                         return;
379                     }
380                     Set JavaDoc keyset = CentralSelector.cSel.sel.keys();
381                     Server.log("KeepAliveCheck", "sync on selectors keyset", Server.MSG_STATE, loglvl);
382                     synchronized (keyset) {
383                         checkArr = (SelectionKey JavaDoc[]) keyset.toArray(new SelectionKey JavaDoc[0]);
384                     }
385                 }
386                 Server.log("KeepAliveCheck", "processing " + checkArr.length + "keys", Server.MSG_STATE, loglvl);
387                 for (int i = 0; i < checkArr.length; i++) {
388                     SelectionKey JavaDoc sk = checkArr[i];
389                     if (!sk.isValid() || !sk.channel().isOpen())
390                         continue;
391                     ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
392                     synchronized (cb) {
393                         long kato = cb.getKeepAliveTimeout(now);
394                         if (kato < 0) // no timeout
395
continue;
396                         if (kato <= now) {
397                             Server.log("KeepAliveCheck", "closing connection to " + cb.conn, Server.MSG_STATE, loglvl);
398                             CentralSelector.dropKey(sk);
399                         } else if (kato < nextCheck)
400                             nextCheck = kato;
401                     }
402                 }
403                 Server.log("KeepAliveCheck", "checking took me " + (System.currentTimeMillis()-now) + " millis", Server.MSG_STATE, loglvl);
404             }
405         }
406     }
407 }
408
Popular Tags