KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > PoolNetwork


1 /*
2  * Copyright (C) 2004 - 2006 ScalAgent Distributed Technologies
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  * USA.
18  *
19  * Initial developer(s): ScalAgent Distributed Technologies
20  * Contributor(s):
21  */

22 package fr.dyade.aaa.agent;
23
24 import java.io.*;
25 import java.net.*;
26 import java.util.*;
27
28 import org.objectweb.util.monolog.api.BasicLevel;
29 import org.objectweb.util.monolog.api.Logger;
30
31 import fr.dyade.aaa.util.*;
32
33 /**
34  * <code>PoolNetwork</code> is an implementation of <code>StreamNetwork</code>
35  * class that manages multiple connection.
36  */

37 public class PoolNetwork extends StreamNetwork {
38   /** */
39   WakeOnConnection wakeOnConnection = null;
40   /** */
41   NetSession sessions[] = null;
42   /** */
43   Dispatcher dispatcher = null;
44   /** */
45   WatchDog watchDog = null;
46
47   static int nbMaxCnx;
48   int nbActiveCnx = 0;
49   NetSession activeSessions[];
50   long current = 0L;
51
52   /**
53    * Creates a new network component.
54    */

55   public PoolNetwork() throws Exception JavaDoc {
56     super();
57   }
58
59   /**
60    * Initializes a new network component. This method is used in order to
61    * easily creates and configure a Network component from a class name.
62    * So we can use the <code>Class.newInstance()</code> method for create
63    * (whitout any parameter) the component, then we can initialize it with
64    * this method.<br>
65    * This method initializes the logical clock for the domain.
66    *
67    * @param name The domain name.
68    * @param port The listen port.
69    * @param servers The list of servers directly accessible from this
70    * network interface.
71    */

72   public void init(String JavaDoc name, int port, short[] servers) throws Exception JavaDoc {
73     super.init(name, port, servers);
74
75     // Creates a session for all domain's server.
76
sessions = new NetSession[servers.length];
77     for (int i=0; i<sessions.length; i++) {
78       if (servers[i] != AgentServer.getServerId())
79         sessions[i] = new NetSession(getName(), servers[i]);
80     }
81     wakeOnConnection = new WakeOnConnection(getName(), logmon);
82     dispatcher = new Dispatcher(getName(), logmon);
83     watchDog = new WatchDog(getName(), logmon);
84   }
85
86   /**
87    * Causes this network component to begin execution.
88    */

89   public void start() throws Exception JavaDoc {
90     logmon.log(BasicLevel.DEBUG, getName() + ", starting");
91     try {
92       nbMaxCnx = AgentServer.getInteger(getName() + ".nbMaxCnx").intValue();
93     } catch (Exception JavaDoc exc) {
94       try {
95     nbMaxCnx = AgentServer.getInteger("PoolNetwork.nbMaxCnx").intValue();
96       } catch (Exception JavaDoc exc2) {
97     nbMaxCnx = 5;
98       }
99     }
100
101     try {
102       if (isRunning())
103     throw new IOException("Consumer already running.");
104
105       for (int i=0; i<sessions.length; i++) {
106         if (sessions[i] != null) sessions[i].init();
107       }
108
109       activeSessions = new NetSession[nbMaxCnx];
110
111       wakeOnConnection.start();
112       dispatcher.start();
113       watchDog.start();
114     } catch (Exception JavaDoc exc) {
115       logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc);
116       throw exc;
117     }
118     logmon.log(BasicLevel.DEBUG, getName() + ", started");
119   }
120
121   /**
122    * Wakes up the watch-dog thread.
123    */

124   public void wakeup() {
125     if (watchDog != null) watchDog.wakeup();
126   }
127
128   /**
129    * Forces the network component to stop executing.
130    */

131   public void stop() {
132     if (wakeOnConnection != null) wakeOnConnection.stop();
133     if (dispatcher != null) dispatcher.stop();
134     if (watchDog != null) watchDog.stop();
135     for (int i=0; i<sessions.length; i++) {
136       // May be we can take in account only "active" sessions.
137
if (sessions[i]!= null) sessions[i].stop();
138     }
139     logmon.log(BasicLevel.DEBUG, getName() + ", stopped");
140   }
141
142   /**
143    * Tests if the network component is alive.
144    *
145    * @return true if this <code>MessageConsumer</code> is alive; false
146    * otherwise.
147    */

148   public boolean isRunning() {
149     if ((wakeOnConnection != null) && wakeOnConnection.isRunning() &&
150     (dispatcher != null) && dispatcher.isRunning() &&
151     (watchDog != null) && watchDog.isRunning())
152       return true;
153
154     return false;
155   }
156
157   final NetSession getSession(short sid) {
158     return sessions[index(sid)];
159   }
160
161   /**
162    * Returns a string representation of this consumer, including the
163    * daemon's name and status.
164    *
165    * @return A string representation of this consumer.
166    */

167   public String JavaDoc toString() {
168     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
169
170     strbuf.append(super.toString()).append("\n\t");
171     if (wakeOnConnection != null)
172       strbuf.append(wakeOnConnection.toString()).append("\n\t");
173     if (dispatcher != null)
174       strbuf.append(dispatcher.toString()).append("\n\t");
175     if (watchDog != null)
176       strbuf.append(watchDog.toString()).append("\n\t");
177     for (int i=0; i<sessions.length; i++) {
178       // May be we can take in account only "active" sessions.
179
if (sessions[i]!= null)
180     strbuf.append(sessions[i].toString()).append("\n\t");
181     }
182
183     return strbuf.toString();
184   }
185
186   final class MessageVector extends Vector {
187     public synchronized Message removeMessage(int stamp) {
188       Message msg = null;
189
190       modCount++;
191       for (int index=0 ; index<elementCount ; index++) {
192         try {
193           msg = (Message) elementData[index];
194         } catch (ClassCastException JavaDoc exc) {
195           continue;
196         }
197         if (msg.getStamp() == stamp) {
198           int j = elementCount - index - 1;
199           if (j > 0) {
200         System.arraycopy(elementData, index + 1, elementData, index, j);
201           }
202           elementCount--;
203           elementData[elementCount] = null; /* to let gc do its work */
204         
205           return msg;
206         }
207       }
208       throw new NoSuchElementException();
209     }
210   }
211
212   final class NetSession implements Runnable JavaDoc {
213     /** Destination server id */
214     private short sid;
215     /**
216      * Boolean variable used to stop the daemon properly. The dameon tests
217      * this variable between each request, and stops if it is false.
218      * @see start
219      * @see stop
220      */

221     private volatile boolean running = false;
222     /**
223      * True if the sessions can be stopped, false otherwise. A session can
224      * be stopped if it is waiting.
225      */

226     private boolean canStop = false;
227     /** The thread. */
228     private Thread JavaDoc thread = null;
229     /** The session's name. */
230     private String JavaDoc name = null;
231
232     /**
233      * True if a "local" connection is in progress, a local connection
234      * is initiated from this server to the remote one (defined by the
235      * {@link #server server} descriptor.
236      * This attribute is used to synchronize local and remote attempts to
237      * make connections.
238      */

239     private boolean local = false;
240
241     /**
242      * The description of the remote server handled by this network session.
243      */

244     private ServerDesc server;
245     /** The communication socket. */
246     private Socket sock = null;
247
248     MessageInputStream nis = null;
249     MessageOutputStream nos = null;
250
251     /** */
252     private MessageVector sendList;
253
254     private long last = 0L;
255
256     NetSession(String JavaDoc name, short sid) {
257       this.sid = sid;
258       this.name = name + ".netSession#" + sid;
259
260       if (logmon.isLoggable(BasicLevel.DEBUG))
261         logmon.log(BasicLevel.DEBUG, getName() + ", created");
262       
263       running = false;
264       canStop = false;
265       thread = null;
266
267       sendList = new MessageVector();
268     }
269
270     void init() throws UnknownServerException {
271       server = AgentServer.getServerDesc(sid);
272     }
273
274     /**
275      * Returns this session's name.
276      *
277      * @return this session's name.
278      */

279     public final String JavaDoc getName() {
280       return name;
281     }
282
283     void start() {
284       if (logmon.isLoggable(BasicLevel.DEBUG))
285         logmon.log(BasicLevel.DEBUG, getName() + ", started");
286
287       long currentTimeMillis = System.currentTimeMillis();
288
289       if (((server.retry < WDNbRetryLevel1) &&
290        ((server.last + WDRetryPeriod1) < currentTimeMillis)) ||
291       ((server.retry < WDNbRetryLevel2) &&
292        ((server.last + WDRetryPeriod2) < currentTimeMillis)) ||
293       ((server.last + WDRetryPeriod3) < currentTimeMillis)) {
294     if (localStart()) {
295       startEnd();
296     } else {
297       server.last = currentTimeMillis;
298       server.retry += 1;
299     }
300       }
301     }
302
303     void start(Socket sock, int boot) {
304       if (logmon.isLoggable(BasicLevel.DEBUG))
305         logmon.log(BasicLevel.DEBUG, getName() + ", remotely started");
306
307       if (remoteStart(sock, boot)) startEnd();
308     }
309
310     /**
311      * Its method is called by <a HREF="#start()">start</a> in order to
312      * initiate a connection from the local server. The corresponding code
313      * on remote server is the method <a HREF="#remoteStart()">remoteStart</a>.
314      * Its method creates the socket, initiates the network connection, and
315      * negociates with remote server.<p><hr>
316      * Its method can be overidden in order to change the connection protocol
317      * (introduces authentification by example, or uses SSL), but must respect
318      * somes conditions:<ul>
319      * <li>send a Boot object after the initialization of object streams (it
320      * is waiting by the wakeOnConnection thread),
321      * <li>wait for an acknowledge,
322      * <li>set the sock, ois and oos attributes at the end if the connection
323      * is correct.
324      * </ul><p>
325      * In order to overide the protocol, we have to implements its method,
326      * with the remoteStart and the transmit methods.
327      *
328      * @return true if the connection is established, false otherwise.
329      */

330     boolean localStart() {
331       synchronized (this) {
332     if ((this.sock != null) || this.local) {
333       // The connection is already established, or a "local" connection
334
// is in progress (remoteStart method is synchronized).
335
// In all cases refuses the connection request.
336
if (logmon.isLoggable(BasicLevel.WARN))
337             logmon.log(BasicLevel.WARN, getName() + ", connection refused");
338       return false;
339     }
340
341     // Set the local attribute in order to block all others local attempts.
342
this.local = true;
343       }
344
345       Socket sock = null;
346       try {
347     sock = createSocket(server);
348     setSocketOption(sock);
349
350     writeBoot(sock.getOutputStream());
351         int boot = readAck(sock.getInputStream());
352
353         AgentServer.getTransaction().begin();
354         testBootTS(sid, boot);
355         AgentServer.getTransaction().commit();
356         AgentServer.getTransaction().release();
357
358         nis = new MessageInputStream(sock.getInputStream());
359         nos = new MessageOutputStream(sock.getOutputStream());
360       } catch (Exception JavaDoc exc) {
361         if (logmon.isLoggable(BasicLevel.WARN))
362           logmon.log(BasicLevel.WARN,
363                      getName() + ", connection refused.", exc);
364     // TODO: Try it later, may be a a connection is in progress...
365
try {
366       sock.getOutputStream().close();
367     } catch (Exception JavaDoc exc2) {}
368     try {
369       sock.getInputStream().close();
370     } catch (Exception JavaDoc exc2) {}
371     try {
372       sock.close();
373     } catch (Exception JavaDoc exc2) {}
374
375     // Reset the local attribute to allow future attempts.
376
this.local = false;
377         nis = null;
378         nos = null;
379
380     return false;
381       }
382
383       // Normally, only one thread can reach this code (*1), so we don't have
384
// to synchronized theses statements. First sets sock attribute, then
385
// releases the local lock.
386
// (*1) all local attempts are blocked and the remote side has already
387
// setup the connection (ACK reply).
388
this.sock = sock;
389       this.local = false;
390
391       return true;
392     }
393
394     /**
395      * Its method is called by <a HREF="start(java.net.Socket,
396      * java.io.ObjectInputStream, java.io.ObjectOutputStream">start</a>
397      * in order to reply to a connection request from a remote server.
398      * The corresponding code on remote server is the method
399      * <a HREF="#localStart()">localStart</a>.
400      *
401      * @param sock the connected socket
402      * @param ois the input stream
403      * @param oos the output stream
404      *
405      * @return true if the connection is established, false otherwise.
406      */

407     synchronized boolean remoteStart(Socket sock, int boot) {
408       try {
409     if ((this.sock != null) ||
410         (this.local && server.sid > AgentServer.getServerId()))
411       // The connection is already established, or a "local" connection
412
// is in progress from this server with a greater priority.
413
// In all cases, stops this "remote" attempt. If the "local"
414
// attempt has a lower priority, it will fail due to a remote
415
// reject.
416
throw new ConnectException("Already connected");
417
418     // Accept this connection.
419
if (logmon.isLoggable(BasicLevel.DEBUG))
420           logmon.log(BasicLevel.DEBUG, getName() + ", send AckStatus");
421
422         writeAck(sock.getOutputStream());
423
424         AgentServer.getTransaction().begin();
425         testBootTS(sid, boot);
426         AgentServer.getTransaction().commit();
427         AgentServer.getTransaction().release();
428
429         nis = new MessageInputStream(sock.getInputStream());
430         nos = new MessageOutputStream(sock.getOutputStream());
431
432     // Fixing sock attribute will prevent any future attempt
433
this.sock = sock;
434
435     return true;
436       } catch (Exception JavaDoc exc) {
437     // May be a a connection is in progress, try it later...
438
if (logmon.isLoggable(BasicLevel.WARN))
439           logmon.log(BasicLevel.WARN, getName() + ", connection refused", exc);
440
441     // Close the connection (# NACK).
442
try {
443       sock.getOutputStream().close();
444     } catch (Exception JavaDoc exc2) {}
445     try {
446       sock.getInputStream().close();
447     } catch (Exception JavaDoc exc2) {}
448     try {
449       sock.close();
450     } catch (Exception JavaDoc exc2) {}
451         nis = null;
452         nos = null;
453       }
454       return false;
455     }
456
457     /**
458      * The session is well initialized, we can start the server thread that
459      * "listen" the connected socket. If the maximum number of connections
460      * is reached, one connection from the pool is closed.
461      */

462     private void startEnd() {
463       server.active = true;
464       server.retry = 0;
465
466       synchronized(activeSessions) {
467     if (nbActiveCnx < nbMaxCnx) {
468       // Insert the current session in the active pool.
469
activeSessions[nbActiveCnx++] = this;
470     } else {
471       // Search the last recently used session in the pool.
472
long min = Long.MAX_VALUE;
473       int idx = -1;
474       for (int i=0; i<nbMaxCnx; i++) {
475         if (activeSessions[i].last < min) {
476           idx = i;
477           min = activeSessions[i].last;
478         }
479       }
480       // Kill choosed session and insert new one
481
activeSessions[idx].stop();
482       activeSessions[idx] = this;
483     }
484     last = current++;
485       }
486       thread = new Thread JavaDoc(this, getName());
487       thread.setDaemon(false);
488
489       running = true;
490       canStop = true;
491       thread.start();
492
493       if (logmon.isLoggable(BasicLevel.DEBUG))
494         logmon.log(BasicLevel.DEBUG, getName() + ", connection started");
495
496       // Try to send all waiting messages. As this.sock is no longer null
497
// so we must do a copy a waiting messages. New messages will be send
498
// directly in send method.
499
// Be careful, in a very limit case a message can be sent 2 times:
500
// added in sendList after sock setting and before array copy, il will
501
// be transmit in send method and below. However there is no problem,
502
// the copy will be discarded on remote node and 2 ack messages will
503
// be received on local node.
504
Object JavaDoc[] waiting = sendList.toArray();
505       logmon.log(BasicLevel.DEBUG,
506          getName() + ", send " + waiting.length + " waiting messages");
507
508       Message msg = null;
509       long currentTimeMillis = System.currentTimeMillis();
510       for (int i=0; i<waiting.length; i++) {
511         msg = (Message) waiting[i];
512         if ((msg.not != null) &&
513             (msg.not.expiration > 0) &&
514             (msg.not.expiration < currentTimeMillis)) {
515           if (logmon.isLoggable(BasicLevel.DEBUG))
516             logmon.log(BasicLevel.DEBUG,
517                        getName() + ": removes expired notification " +
518                        msg.from + ", " + msg.not);
519           try {
520             doAck(msg.getStamp());
521           } catch (IOException exc) {
522             logmon.log(BasicLevel.ERROR,
523                        getName() + ": cannot removes expired notification " +
524                        msg.from + ", " + msg.not, exc);
525           }
526         } else {
527           transmit(msg, currentTimeMillis);
528         }
529       }
530     }
531
532     /**
533      *
534      */

535     void stop() {
536       running = false;
537       
538       if (logmon.isLoggable(BasicLevel.DEBUG))
539         logmon.log(BasicLevel.DEBUG, getName() + ", stopped.");
540
541       while ((thread != null) && thread.isAlive()) {
542         if (canStop) {
543           if (thread.isAlive()) thread.interrupt();
544           shutdown();
545         }
546         try {
547           thread.join(1000L);
548         } catch (InterruptedException JavaDoc exc) {
549           continue;
550         }
551         thread = null;
552       }
553     }
554
555     public void shutdown() {
556       close();
557     }
558
559     synchronized void close() {
560       if (logmon.isLoggable(BasicLevel.DEBUG))
561         logmon.log(BasicLevel.DEBUG, getName() + ", closed.");
562
563       try {
564     sock.getInputStream().close();
565       } catch (Exception JavaDoc exc) {}
566       try {
567     sock.getOutputStream().close();
568       } catch (Exception JavaDoc exc) {}
569       try {
570     sock.close();
571       } catch (Exception JavaDoc exc) {}
572       sock = null;
573
574       nis = null;
575       nos = null;
576     }
577
578     /**
579      * Removes all messages in sendList previous to the ack'ed one.
580      * Be careful, messages in sendList are not always in stamp order.
581      * Its method should not be synchronized, it scans the list from
582      * begin to end, and it removes always the first element. Other
583      * methods using sendList just adds element at the end.
584      */

585     final private void doAck(int ack) throws IOException {
586       Message msg = null;
587
588       if (logmon.isLoggable(BasicLevel.DEBUG))
589         logmon.log(BasicLevel.DEBUG, getName() + ", ack received #" + ack);
590
591       try {
592         // Suppress the acknowledged notification from waiting list,
593
// and deletes it.
594
msg = sendList.removeMessage(ack);
595         AgentServer.getTransaction().begin();
596         msg.delete();
597         msg.free();
598         AgentServer.getTransaction().commit();
599         AgentServer.getTransaction().release();
600
601         if (logmon.isLoggable(BasicLevel.DEBUG))
602           logmon.log(BasicLevel.DEBUG,
603                      getName() + ", remove msg#" + msg.getStamp());
604       } catch (NoSuchElementException exc) {
605         logmon.log(BasicLevel.WARN,
606                    getName() + ", can't ack, unknown msg#" + ack);
607       }
608     }
609
610     /**
611      * Be careful, its method should not be synchronized (in that case, the
612      * overall synchronization of the connection -method start- can dead-lock).
613      */

614     final void send(Message msg) {
615       if (logmon.isLoggable(BasicLevel.DEBUG)) {
616         if (msg.not != null) {
617           logmon.log(BasicLevel.DEBUG,
618                      getName() + ", send msg#" + msg.getStamp());
619         } else {
620           logmon.log(BasicLevel.DEBUG,
621                      getName() + ", send ack#" + msg.getStamp());
622         }
623       }
624
625       long currentTimeMillis = System.currentTimeMillis();
626
627       if (msg.not != null) {
628         sendList.addElement(msg);
629
630         if ((msg.not.expiration > 0) &&
631             (msg.not.expiration < currentTimeMillis)) {
632           if (logmon.isLoggable(BasicLevel.DEBUG))
633             logmon.log(BasicLevel.DEBUG,
634                        getName() + ": removes expired notification " +
635                        msg.from + ", " + msg.not);
636           try {
637             doAck(msg.getStamp());
638           } catch (IOException exc) {
639             logmon.log(BasicLevel.ERROR,
640                        getName() + ": cannot removes expired notification " +
641                        msg.from + ", " + msg.not, exc);
642           }
643           return;
644         }
645       }
646
647       if (sock == null) {
648     // If there is no connection between local and destination server,
649
// try to make one!
650
start();
651       } else {
652     transmit(msg, currentTimeMillis);
653       }
654     }
655     
656     final private void ack(int stamp) throws Exception JavaDoc {
657       if (logmon.isLoggable(BasicLevel.DEBUG))
658           logmon.log(BasicLevel.DEBUG,
659                      getName() + ", set ack msg#" + stamp);
660
661       Message ack = Message.alloc(AgentId.localId,
662                                   AgentId.localId(server.sid),
663                                   null);
664       ack.source = AgentServer.getServerId();
665       ack.dest = AgentServer.getServerDesc(server.sid).gateway;
666       ack.stamp = stamp;
667
668       qout.push(ack);
669     }
670
671     final private synchronized void transmit(Message msg,
672                                              long currentTimeMillis) {
673       last = current++;
674       try {
675         nos.writeMessage(msg, currentTimeMillis);
676       } catch (IOException exc) {
677         logmon.log(BasicLevel.ERROR,
678                    getName() + ", exception in sending message", exc);
679     close();
680       } catch (NullPointerException JavaDoc exc) {
681         // The stream is closed, exits !
682
}
683     }
684
685     public void run() {
686       Message msg;
687
688       try {
689     while (running) {
690       canStop = true;
691
692           if (logmon.isLoggable(BasicLevel.DEBUG))
693             logmon.log(BasicLevel.DEBUG, getName() + ", waiting message");
694
695       try {
696         msg = nis.readMessage();
697       } catch (ClassNotFoundException JavaDoc exc) {
698         // TODO: In order to process it we have to return an error,
699
// but in that case me must identify the bad message...
700
logmon.log(BasicLevel.ERROR,
701                        getName() + ", error during waiting message", exc);
702         continue;
703       } catch (InvalidClassException exc) {
704         // TODO: In order to process it we have to return an error,
705
// but in that case me must identify the bad message..
706
logmon.log(BasicLevel.ERROR,
707                        getName() + ", error during waiting message", exc);
708         continue;
709       } catch (StreamCorruptedException exc) {
710         logmon.log(BasicLevel.ERROR,
711                        getName() + ", error during waiting message", exc);
712         break;
713       } catch (OptionalDataException exc) {
714         logmon.log(BasicLevel.ERROR,
715                        getName() + ", error during waiting message", exc);
716         break;
717       } catch (NullPointerException JavaDoc exc) {
718             // The stream is closed, exits !
719
break;
720           }
721
722       canStop = false;
723
724           if (logmon.isLoggable(BasicLevel.DEBUG))
725             logmon.log(BasicLevel.DEBUG, getName() + ", receives: " + msg);
726
727           // Keep message stamp in order to acknowledge it (be careful,
728
// the message get a new stamp to be delivered).
729
int stamp = msg.getStamp();
730           if (msg.not != null) {
731             deliver(msg);
732             ack(stamp);
733           } else {
734             doAck(stamp);
735           }
736     }
737       } catch (EOFException exc) {
738         if (running)
739           logmon.log(BasicLevel.WARN,
740                      this.getName() + ", connection closed", exc);
741       } catch (SocketException exc) {
742         if (running)
743           logmon.log(BasicLevel.WARN,
744                      this.getName() + ", connection closed", exc);
745       } catch (Exception JavaDoc exc) {
746     logmon.log(BasicLevel.ERROR, getName() + ", exited", exc);
747       } finally {
748     logmon.log(BasicLevel.DEBUG, getName() + ", ends");
749     running = false;
750     close();
751       }
752     }
753
754     /**
755      * Class used to read messages through a stream.
756      */

757     final class MessageInputStream extends ByteArrayInputStream {
758       private InputStream is = null;
759
760       MessageInputStream(InputStream is) {
761         super(new byte[256]);
762         this.is = is;
763       }
764
765       private void readFully(int length) throws IOException {
766         count = 0;
767         if (length > buf.length) buf = new byte[length];
768
769         int nb = -1;
770         do {
771           nb = is.read(buf, count, length-count);
772           if (logmon.isLoggable(BasicLevel.DEBUG))
773             logmon.log(BasicLevel.DEBUG, getName() + ", reads:" + nb);
774           if (nb < 0) throw new EOFException();
775           count += nb;
776         } while (count != length);
777         pos = 0;
778       }
779
780       Message readMessage() throws Exception JavaDoc {
781         count = 0;
782         readFully(Message.LENGTH +4 -1);
783         // Reads boot timestamp of source server
784
int length = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) +
785           ((buf[2] & 0xFF) << 8) + ((buf[3] & 0xFF) << 0);
786
787         Message msg = Message.alloc();
788         int idx = msg.readFromBuf(buf, 4);
789
790         if (length > idx) {
791           // Be careful, the buffer is resetted
792
readFully(length - idx);
793
794           // Reads notification attributes
795
boolean persistent = ((buf[0] & Message.PERSISTENT) == 0)?false:true;
796           boolean detachable = ((buf[0] & Message.DETACHABLE) == 0)?false:true;
797
798           pos = 1;
799           // Reads notification object
800
ObjectInputStream ois = new ObjectInputStream(this);
801           msg.not = (Notification) ois.readObject();
802           if (msg.not.expiration > 0)
803             msg.not.expiration += System.currentTimeMillis();
804           msg.not.persistent = persistent;
805           msg.not.detachable = detachable;
806           msg.not.detached = false;
807         } else {
808           msg.not = null;
809         }
810
811         return msg;
812       }
813     }
814
815     /**
816      * Class used to send messages through a stream.
817      */

818     final class MessageOutputStream extends ByteArrayOutputStream {
819       private OutputStream os = null;
820       private ObjectOutputStream oos = null;
821
822       MessageOutputStream(OutputStream os) throws IOException {
823         super(256);
824
825         this.os = os;
826         oos = new ObjectOutputStream(this);
827         count = 0;
828         buf[Message.LENGTH +4] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF);
829         buf[Message.LENGTH +5] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF);
830         buf[Message.LENGTH +6] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF);
831         buf[Message.LENGTH +7] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF);
832       }
833
834       void writeMessage(Message msg,
835                         long currentTimeMillis) throws IOException {
836         logmon.log(BasicLevel.DEBUG, getName() + ", sends " + msg);
837
838         int idx = msg.writeToBuf(buf, 4);
839         // Be careful, notification attribute are not written if there
840
// is no notification.
841
count = Message.LENGTH +4 -1;
842         
843         try {
844           if (msg.not != null) {
845             // Writes notification attributes
846
buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) |
847                                  (msg.not.detachable?Message.DETACHABLE:0));
848
849             // Be careful, the stream header is hard-written in buf
850
count = Message.LENGTH +8;
851
852             if (msg.not.expiration > 0)
853               msg.not.expiration -= currentTimeMillis;
854             oos.writeObject(msg.not);
855             
856             oos.reset();
857             oos.flush();
858           }
859
860           // Writes length at beginning
861
buf[0] = (byte) (count >>> 24);
862           buf[1] = (byte) (count >>> 16);
863           buf[2] = (byte) (count >>> 8);
864           buf[3] = (byte) (count >>> 0);
865
866           os.write(buf, 0, count);;
867           os.flush();
868         } finally {
869           if ((msg.not != null) && (msg.not.expiration > 0))
870             msg.not.expiration += currentTimeMillis;
871           count = 0;
872         }
873       }
874     }
875   }
876
877   final class WakeOnConnection extends Daemon {
878     ServerSocket listen = null;
879
880     WakeOnConnection(String JavaDoc name, Logger logmon) throws IOException {
881       super(name + ".wakeOnConnection");
882       // creates a server socket listening on configured port
883
listen = createServerSocket();
884       // Overload logmon definition in Daemon
885
this.logmon = logmon;
886     }
887
888     protected void close() {
889       try {
890     listen.close();
891       } catch (Exception JavaDoc exc) {}
892       listen = null;
893     }
894
895     protected void shutdown() {
896       close();
897     }
898
899     /**
900      *
901      */

902     public void run() {
903       /** Connected socket. */
904       Socket sock = null;
905
906       try {
907     while (running) {
908       try {
909         canStop = true;
910
911         // Get the connection
912
try {
913               if (this.logmon.isLoggable(BasicLevel.DEBUG))
914                 this.logmon.log(BasicLevel.DEBUG,
915                            this.getName() + ", waiting connection");
916
917           sock = listen.accept();
918         } catch (IOException exc) {
919               if (running)
920                 this.logmon.log(BasicLevel.ERROR,
921                                 this.getName() +
922                                 ", error during waiting connection", exc);
923           continue;
924         }
925         canStop = false;
926
927         setSocketOption(sock);
928
929             Boot boot = readBoot(sock.getInputStream());
930             if (this.logmon.isLoggable(BasicLevel.DEBUG))
931               this.logmon.log(BasicLevel.DEBUG,
932                               this.getName() + ", connection setup from #" +
933                               boot.sid);
934             getSession(boot.sid).start(sock, boot.boot);
935       } catch (Exception JavaDoc exc) {
936         this.logmon.log(BasicLevel.ERROR,
937                             this.getName() + ", bad connection setup", exc);
938       }
939     }
940       } finally {
941         finish();
942       }
943     }
944   }
945
946   final class Dispatcher extends Daemon {
947     Dispatcher(String JavaDoc name, Logger logmon) {
948       super(name + ".dispatcher");
949       // Overload logmon definition in Daemon
950
this.logmon = logmon;
951     }
952
953     protected void close() {}
954
955     protected void shutdown() {}
956
957     public void run() {
958       Message msg = null;
959       
960       try {
961         while (running) {
962           canStop = true;
963
964           if (this.logmon.isLoggable(BasicLevel.DEBUG))
965             this.logmon.log(BasicLevel.DEBUG,
966                             this.getName() + ", waiting message");
967           try {
968             msg = qout.get();
969           } catch (InterruptedException JavaDoc exc) {
970             continue;
971           }
972           canStop = false;
973           if (! running) break;
974
975           // Send the message
976
getSession(msg.getDest()).send(msg);
977           qout.pop();
978         }
979       } finally {
980         finish();
981       }
982     }
983   }
984
985   final class WatchDog extends Daemon {
986    /** Use to synchronize thread */
987     private Object JavaDoc lock;
988
989     WatchDog(String JavaDoc name, Logger logmon) {
990       super(name + ".watchdog");
991       lock = new Object JavaDoc();
992       // Overload logmon definition in Daemon
993
this.logmon = logmon;
994     }
995
996     protected void close() {}
997
998     protected void shutdown() {
999       wakeup();
1000    }
1001
1002    void wakeup() {
1003      synchronized (lock) {
1004    lock.notify();
1005      }
1006    }
1007
1008    public void run() {
1009      try {
1010        synchronized (lock) {
1011          while (running) {
1012            try {
1013              lock.wait(WDActivationPeriod);
1014              if (this.logmon.isLoggable(BasicLevel.DEBUG))
1015                this.logmon.log(BasicLevel.DEBUG,
1016                                this.getName() + ", activated");
1017            } catch (InterruptedException JavaDoc exc) {
1018              continue;
1019            }
1020
1021            if (! running) break;
1022
1023            for (int sid=0; sid<sessions.length; sid++) {
1024              if ((sessions[sid] != null) &&
1025                  (sessions[sid].sendList.size() > 0) &&
1026                  (! sessions[sid].running)) {
1027                sessions[sid].start();
1028              }
1029            }
1030          }
1031        }
1032      } finally {
1033        finish();
1034      }
1035    }
1036  }
1037
1038
1039  final void writeBoot(OutputStream out) throws IOException {
1040    if (logmon.isLoggable(BasicLevel.DEBUG))
1041      logmon.log(BasicLevel.DEBUG, getName() + ", writeBoot: " + getBootTS());
1042
1043    byte[] iobuf = new byte[6];
1044    iobuf[0] = (byte) (AgentServer.getServerId() >>> 8);
1045    iobuf[1] = (byte) (AgentServer.getServerId() >>> 0);
1046    iobuf[2] = (byte) (getBootTS() >>> 24);
1047    iobuf[3] = (byte) (getBootTS() >>> 16);
1048    iobuf[4] = (byte) (getBootTS() >>> 8);
1049    iobuf[5] = (byte) (getBootTS() >>> 0);
1050    out.write(iobuf);
1051    out.flush();
1052  }
1053  
1054  final class Boot {
1055    transient short sid;
1056    transient int boot;
1057  }
1058
1059  final void readFully(InputStream is, byte[] iobuf) throws IOException {
1060    int n = 0;
1061    do {
1062      int count = is.read(iobuf, n, iobuf.length - n);
1063      if (count < 0) throw new EOFException();
1064      n += count;
1065    } while (n < iobuf.length);
1066  }
1067
1068  final Boot readBoot(InputStream in) throws IOException {
1069    Boot boot = new Boot();
1070
1071    byte[] iobuf = new byte[6];
1072    readFully(in, iobuf);
1073    boot.sid = (short) (((iobuf[0] & 0xFF) << 8) + (iobuf[1] & 0xFF));
1074    boot.boot = ((iobuf[2] & 0xFF) << 24) + ((iobuf[3] & 0xFF) << 16) +
1075      ((iobuf[4] & 0xFF) << 8) + ((iobuf[5] & 0xFF) << 0);
1076
1077    if (logmon.isLoggable(BasicLevel.DEBUG))
1078      logmon.log(BasicLevel.DEBUG, getName() + ", readBoot from #" + boot.sid +
1079                 " -> " + boot.boot);
1080
1081    return boot;
1082  }
1083  
1084  final void writeAck(OutputStream out) throws IOException {
1085    if (logmon.isLoggable(BasicLevel.DEBUG))
1086      logmon.log(BasicLevel.DEBUG, getName() + ", writeAck: " + getBootTS());
1087
1088    byte[] iobuf = new byte[4];
1089    iobuf[0] = (byte) (getBootTS() >>> 24);
1090    iobuf[1] = (byte) (getBootTS() >>> 16);
1091    iobuf[2] = (byte) (getBootTS() >>> 8);
1092    iobuf[3] = (byte) (getBootTS() >>> 0);
1093    out.write(iobuf);
1094    out.flush();
1095  }
1096  
1097  final int readAck(InputStream in)throws IOException {
1098    byte[] iobuf = new byte[4];
1099    readFully(in, iobuf);
1100    int boot = ((iobuf[0] & 0xFF) << 24) + ((iobuf[1] & 0xFF) << 16) +
1101      ((iobuf[2] & 0xFF) << 8) + ((iobuf[3] & 0xFF) << 0);
1102
1103    if (logmon.isLoggable(BasicLevel.DEBUG))
1104      logmon.log(BasicLevel.DEBUG, getName() + ", readAck:" + boot);
1105
1106    return boot;
1107  }
1108}
1109
Popular Tags