KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > NGNetwork


1 /*
2  * Copyright (C) 2003 - 2005 ScalAgent Distributed Technologies
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  * USA.
18  */

19 package fr.dyade.aaa.agent;
20
21 import java.io.*;
22 import java.net.*;
23 import java.util.*;
24
25 import java.nio.*;
26 import java.nio.channels.*;
27
28 import org.objectweb.util.monolog.api.BasicLevel;
29 import org.objectweb.util.monolog.api.Logger;
30
31 import fr.dyade.aaa.util.Daemon;
32
33 /**
34  * <code>NGNetwork</code> is a new implementation of <code>Network</code>
35  * class using nio package.
36  */

37 public class NGNetwork extends StreamNetwork {
38   final static int Kb = 1024;
39   final static int Mb = 1024 * Kb;
40
41   final static int SO_BUFSIZE = 64 * Kb;
42
43   Selector selector = null;
44
45   Dispatcher dispatcher = null;
46   NetServer dmon[] = null;
47
48   // Be careful, NbNetServer>1 cause malfunctioning (slowness, etc.)
49
final static int NbNetServer = 1;
50
51   CnxHandler[] handlers = null;
52
53   /**
54    * Creates a new network component.
55    */

56   public NGNetwork() {
57     super();
58   }
59
60   /**
61    * Initializes a new network component. This method is used in order to
62    * easily creates and configure a Network component from a class name.
63    * So we can use the <code>Class.newInstance()</code> method for create
64    * (whitout any parameter) the component, then we can initialize it with
65    * this method.<br>
66    * This method initializes the logical clock for the domain.
67    *
68    * @param name The domain name.
69    * @param port The listen port.
70    * @param servers The list of servers directly accessible from this
71    * network interface.
72    */

73   public void init(String JavaDoc name, int port, short[] servers) throws Exception JavaDoc {
74     super.init(name, port, servers);
75
76     // Creates a connection handler for each domain's server.
77
handlers = new CnxHandler[servers.length];
78     for (int i=0; i<servers.length; i++) {
79       if (servers[i] != AgentServer.getServerId())
80         handlers[i] = new CnxHandler(getName(), servers[i]);
81     }
82   }
83
84   ServerSocketChannel listen = null;
85
86   void open() throws IOException {
87     // Create a blocking server socket channel on port
88
listen = ServerSocketChannel.open();
89     listen.configureBlocking(false);
90     listen.socket().bind(new InetSocketAddress(port));
91
92     // Register channels with selector
93
listen.register(selector, SelectionKey.OP_ACCEPT);
94   }
95
96   void close() {
97     try {
98       listen.close();
99     } catch (Exception JavaDoc exc) {}
100     listen = null;
101   }
102
103   /**
104    * Causes this network component to begin execution.
105    */

106   public void start() throws Exception JavaDoc {
107     try {
108       logmon.log(BasicLevel.DEBUG, getName() + ", starting");
109
110       // Create a selector
111
selector = Selector.open();
112       // Creates a connection handler for each domain's server.
113
for (int i=0; i<handlers.length; i++) {
114         if (handlers[i] != null) handlers[i].init();
115       }
116       open();
117
118       if (dispatcher == null)
119         dispatcher = new Dispatcher(getName(), logmon);
120
121       if (dmon == null) {
122         dmon = new NetServer[NbNetServer];
123         for (int i=0; i<NbNetServer; i++) {
124           dmon[i] = new NetServer(getName(), logmon);
125         }
126       }
127
128       if (! dispatcher.isRunning()) dispatcher.start();
129       for (int i=0; i<NbNetServer; i++) {
130         if (! dmon[i].isRunning()) dmon[i].start();
131       }
132     } catch (IOException exc) {
133       logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc);
134       throw exc;
135     }
136     logmon.log(BasicLevel.DEBUG, getName() + ", started");
137   }
138
139   final CnxHandler getHandler(short sid) {
140     return handlers[index(sid)];
141   }
142
143 // /**
144
// * Adds a message in "ready to deliver" list. This method allocates a
145
// * new time stamp to the message ; be Careful, changing the stamp imply
146
// * the filename change too.
147
// */
148
// public void post(Message msg) throws Exception {
149
// short to = AgentServer.getServerDesc(msg.to.to).gateway;
150
// // Allocates a new timestamp. Be careful, if the message needs to be
151
// // routed we have to use the next destination in timestamp generation.
152
// msg.source = AgentServer.getServerId();
153
// msg.dest = to;
154
// msg.stamp = getSendUpdate(to);
155
// // Saves the message.
156
// msg.save();
157
// // Push it in "ready to deliver" queue.
158
// getHandler(to).send(msg);
159
// }
160

161   /**
162    * Wakes up the watch-dog thread.
163    */

164   public void wakeup() {
165     if (selector != null) selector.wakeup();
166     logmon.log(BasicLevel.DEBUG, getName() + ", wakeup");
167   }
168
169   /**
170    * Forces the network component to stop executing.
171    */

172   public void stop() {
173     if (dispatcher != null) dispatcher.stop();
174     if (dmon != null) {
175       for (int i=0; i<NbNetServer; i++) {
176         if (dmon[i] != null) dmon[i].stop();
177       }
178     }
179     close();
180     logmon.log(BasicLevel.DEBUG, getName() + ", stopped");
181   }
182
183   /**
184    * Tests if the network component is alive.
185    *
186    * @return true if this <code>MessageConsumer</code> is alive; false
187    * otherwise.
188    */

189   public boolean isRunning() {
190     if ((dispatcher == null) || ! dispatcher.isRunning())
191       return false;
192
193     if (dmon == null)
194       return false;
195
196     for (int i=0; i<NbNetServer; i++) {
197       if ((dmon[i] == null) || ! dmon[i].isRunning())
198         return false;
199     }
200
201     return true;
202   }
203
204   /**
205    * Returns a string representation of this consumer, including the
206    * daemon's name and status.
207    *
208    * @return A string representation of this consumer.
209    */

210   public String JavaDoc toString() {
211     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
212
213     strbuf.append(super.toString()).append("\n\t");
214     if (dispatcher != null)
215       strbuf.append(dispatcher.toString()).append("\n");
216     for (int i=0; i<NbNetServer; i++) {
217       if ((dmon != null) && (dmon[i] != null))
218         strbuf.append(dmon[i].toString()).append("\n");
219     }
220
221     return strbuf.toString();
222   }
223
224   void cnxStart(SocketChannel channel) throws IOException {
225     if (logmon.isLoggable(BasicLevel.DEBUG))
226       logmon.log(BasicLevel.DEBUG, getName() + ", remotely started");
227
228     channel.socket().setSendBufferSize(SO_BUFSIZE);
229     channel.socket().setReceiveBufferSize(SO_BUFSIZE);
230     if (logmon.isLoggable(BasicLevel.DEBUG))
231       logmon.log(BasicLevel.DEBUG, getName() + " bufsize: " +
232                  channel.socket().getReceiveBufferSize() + ", " +
233                  channel.socket().getSendBufferSize());
234     
235     ByteBuffer buf = ByteBuffer.allocate(6);
236     channel.read(buf);
237     buf.flip();
238     short sid = buf.getShort();
239     int boot = buf.getInt();
240       
241     CnxHandler cnx = getHandler(sid);
242     if (cnx.remoteStart(channel, boot)) cnx.startEnd();
243   }
244
245   final class Dispatcher extends Daemon {
246     Dispatcher(String JavaDoc name, Logger logmon) {
247       super(name + ".dispatcher");
248       // Overload logmon definition in Daemon
249
this.logmon = logmon;
250     }
251
252     protected void close() {}
253
254     protected void shutdown() {}
255
256     public void run() {
257       Message msg = null;
258       
259       try {
260         while (running) {
261           canStop = true;
262
263           if (this.logmon.isLoggable(BasicLevel.DEBUG))
264             this.logmon.log(BasicLevel.DEBUG,
265                             this.getName() + ", waiting message");
266           try {
267             msg = qout.get();
268           } catch (InterruptedException JavaDoc exc) {
269             continue;
270           }
271           canStop = false;
272           if (! running) break;
273
274           try {
275             // Send the message
276
getHandler(msg.getDest()).send(msg);
277           } catch (IOException exc) {
278             if (this.logmon.isLoggable(BasicLevel.ERROR))
279               this.logmon.log(BasicLevel.ERROR, this.getName(), exc);
280           }
281           qout.pop();
282         }
283       } finally {
284         finish();
285       }
286     }
287   }
288
289   final class NetServer extends Daemon {
290     NetServer(String JavaDoc name, Logger logmon) throws IOException {
291       super(name + ".NetServer");
292       // Overload logmon definition in Daemon
293
this.logmon = logmon;
294
295 // // Create a blocking server socket channel on port
296
// listen = ServerSocketChannel.open();
297
// listen.configureBlocking(false);
298
// listen.socket().bind(new InetSocketAddress(port));
299

300 // // Register channels with selector
301
// listen.register(selector, SelectionKey.OP_ACCEPT);
302
}
303
304     protected void close() {
305 // try {
306
// listen.close();
307
// } catch (Exception exc) {}
308
// listen = null;
309
}
310
311     protected void shutdown() {
312       close();
313     }
314
315     public void run() {
316       int nbop = 0;
317       CnxHandler cnx = null;
318
319       try {
320
321       while (running) {
322
323         // This call blocks until there is activity on one of the
324
// registered channels. This is the key method in non-blocking I/O.
325
canStop = true;
326         try {
327           if (logmon.isLoggable(BasicLevel.DEBUG))
328             logmon.log(BasicLevel.DEBUG, getName() + ", on select");
329           nbop = selector.select(WDActivationPeriod);
330           if (logmon.isLoggable(BasicLevel.DEBUG))
331             logmon.log(BasicLevel.DEBUG, getName() + ", on select:" + nbop);
332         } catch (IOException exc) {
333         }
334
335         for (int idx=0; idx<handlers.length; idx++) {
336           if (logmon.isLoggable(BasicLevel.DEBUG))
337             logmon.log(BasicLevel.DEBUG, getName() + ", " + handlers[idx]);
338
339           if ((handlers[idx] != null) &&
340               (handlers[idx].sendlist.size() > 0) &&
341               (handlers[idx].channel == null)) {
342             try {
343               handlers[idx].start();
344             } catch (IOException exc) {
345               this.logmon.log(BasicLevel.WARN,
346                               this.getName() + ", can't start cnx#" + idx,
347                               exc);
348             }
349           }
350         }
351
352         if (nbop == 0) continue;
353         canStop = false;
354
355         // Get list of selection keys with pending events, then process
356
// each key
357
Set keys = selector.selectedKeys();
358         for(Iterator it = keys.iterator(); it.hasNext(); ) {
359           if (! running) break;
360
361           // Get the selection key
362
SelectionKey key = (SelectionKey) it.next();
363           // Remove it from the list to indicate that it is being processed
364
it.remove();
365
366           if (logmon.isLoggable(BasicLevel.DEBUG))
367             logmon.log(BasicLevel.DEBUG,
368                        getName() + "(1): " + key + " -> " + key.interestOps());
369
370           logmon.log(BasicLevel.DEBUG,
371                      getName() + ":" +
372                      key.isValid() +
373                      key.isAcceptable() +
374                      key.isReadable() +
375                      key.isWritable());
376           try {
377             // Check if it's a connection request
378
if (key.isAcceptable()) {
379               if (logmon.isLoggable(BasicLevel.DEBUG))
380                 logmon.log(BasicLevel.DEBUG, getName() + " acceptable");
381
382               // Get channel with connection request (useless ?)
383
ServerSocketChannel server = (ServerSocketChannel) key.channel();
384               
385               // Accept the connection request.
386
SocketChannel channel = server.accept();
387
388               // Register the channel with selector, listening for all events
389
cnxStart(channel);
390             } else {
391               cnx = (CnxHandler) key.attachment();
392               
393               if (logmon.isLoggable(BasicLevel.DEBUG))
394                 logmon.log(BasicLevel.DEBUG,
395                            getName() + ": " + key + " -> " + cnx);
396               // Since the ready operations are cumulative,
397
// need to check readiness for each operation
398
if (key.isValid() && key.isReadable()) {
399                 if (logmon.isLoggable(BasicLevel.DEBUG))
400                   logmon.log(BasicLevel.DEBUG, getName() + " readable");
401                 cnx.read();
402               }
403               if (key.isValid() && key.isWritable()) {
404                 if (logmon.isLoggable(BasicLevel.DEBUG))
405                   logmon.log(BasicLevel.DEBUG, getName() + " writable");
406                 cnx.write();
407               } else if (cnx.sendlist.size() > 0) {
408                 logmon.log(BasicLevel.FATAL, getName() + " force");
409                 key.interestOps(key.channel().validOps());
410               }
411             }
412             if (logmon.isLoggable(BasicLevel.DEBUG))
413               logmon.log(BasicLevel.DEBUG, getName() + "(2): " +
414                          key + " -> " + key.interestOps());
415
416           } catch (Exception JavaDoc exc) {
417             logmon.log(BasicLevel.ERROR, getName(), exc);
418             // Handle error with channel and unregister
419
try {
420               cnx.close();
421             } catch (IOException exc2) {
422               logmon.log(BasicLevel.ERROR, getName(), exc2);
423             }
424           }
425         }
426       }
427       } catch (Throwable JavaDoc exc) {
428         logmon.log(BasicLevel.FATAL, getName(), exc);
429       }
430     }
431   }
432
433   class CnxHandler {
434     /** Destination server id */
435     private short sid;
436     /** The handler's name. */
437     private String JavaDoc name = null;
438     /**
439      * True if a "local" connection is in progress, a local connection
440      * is initiated from this server to the remote one (defined by the
441      * {@link #server server} descriptor.
442      * This attribute is used to synchronize local and remote attempts to
443      * make connections.
444      */

445     private boolean local = false;
446     /** The description of the remote server handled by this network session */
447     private ServerDesc server;
448     /** The communication socket channel */
449     SocketChannel channel = null;
450
451     /** Date of last connection attempt */
452     long lasttry = 0L;
453
454     /** Informations for output */
455     int nbwrite = 0;
456     MessageOutputStream mos = null;
457     ByteBuffer bufout = null;
458
459     /** FIFO list of all messages to be sent */
460     MessageVector sendlist = null;
461
462     /** Informations for input */
463     ByteBuffer bufin = null;
464     MessageInputStream mis = null;
465
466     CnxHandler(String JavaDoc name, short sid) throws IOException {
467       this.sid = sid;
468       this.name = name + ".cnxHandler#" + sid;
469       
470       if (logmon.isLoggable(BasicLevel.DEBUG))
471         logmon.log(BasicLevel.DEBUG, getName() + ", created");
472
473       mos = new MessageOutputStream();
474       bufin = ByteBuffer.allocateDirect(SO_BUFSIZE);
475       mis = new MessageInputStream();
476
477       sendlist = new MessageVector();
478     }
479
480     void init() throws IOException, UnknownServerException {
481       server = AgentServer.getServerDesc(sid);
482       if (sendlist.size() > 0) start();
483     }
484
485     /**
486      * Returns this session's name.
487      *
488      * @return this session's name.
489      */

490     public final String JavaDoc getName() {
491       return name;
492     }
493
494     void start() throws IOException {
495       if (logmon.isLoggable(BasicLevel.DEBUG))
496         logmon.log(BasicLevel.DEBUG, getName() + ", try to start");
497
498       long currentTimeMillis = System.currentTimeMillis();
499
500       if (server == null)
501         // probalby the Consumer is initialized but not always started
502
return;
503
504       if (((server.retry < WDNbRetryLevel1) &&
505        ((server.last + WDRetryPeriod1) < currentTimeMillis)) ||
506       ((server.retry < WDNbRetryLevel2) &&
507        ((server.last + WDRetryPeriod2) < currentTimeMillis)) ||
508       ((server.last + WDRetryPeriod3) < currentTimeMillis)) {
509     if (localStart()) {
510       startEnd();
511     } else {
512       server.last = currentTimeMillis;
513       server.retry += 1;
514     }
515       }
516     }
517
518     /**
519      * Its method is called by <a HREF="#start()">start</a> in order to
520      * initiate a connection from the local server. The corresponding code
521      * on remote server is the method <a HREF="#remoteStart()">remoteStart</a>.
522      * Its method creates the socket, initiates the network connection, and
523      * negociates with remote server.<p><hr>
524      * Its method can be overidden in order to change the connection protocol
525      * (introduces authentification by example, or uses SSL), but must respect
526      * somes conditions:<ul>
527      * <li>send a Boot object after the initialization of object streams (it
528      * is waiting by the wakeOnConnection thread),
529      * <li>wait for an acknowledge,
530      * <li>set the sock, ois and oos attributes at the end if the connection
531      * is correct.
532      * </ul><p>
533      * In order to overide the protocol, we have to implements its method,
534      * with the remoteStart and the transmit methods.
535      *
536      * @return true if the connection is established, false otherwise.
537      */

538     boolean localStart() {
539       synchronized (this) {
540     if ((this.channel != null) || this.local) {
541       // The connection is already established, or a "local" connection
542
// is in progress (remoteStart method is synchronized).
543
// In all cases refuses the connection request.
544
if (logmon.isLoggable(BasicLevel.WARN))
545             logmon.log(BasicLevel.WARN, getName() + ", connection refused");
546       return false;
547     }
548
549     // Set the local attribute in order to block all others local attempts.
550
this.local = true;
551       }
552
553       SocketChannel channel = null;
554       try {
555         SocketAddress addr = new InetSocketAddress(server.getAddr(),
556                                                    server.getPort());
557         channel = SocketChannel.open(addr);
558
559         channel.socket().setSendBufferSize(SO_BUFSIZE);
560         channel.socket().setReceiveBufferSize(SO_BUFSIZE);
561         if (logmon.isLoggable(BasicLevel.DEBUG))
562           logmon.log(BasicLevel.DEBUG, getName() + " bufsize: " +
563                      channel.socket().getReceiveBufferSize() + ", " +
564                      channel.socket().getSendBufferSize());
565
566         if (logmon.isLoggable(BasicLevel.DEBUG))
567           logmon.log(BasicLevel.DEBUG,
568                      getName() + ", writeBoot: " + getBootTS());
569
570         ByteBuffer buf = ByteBuffer.allocate(6);
571         buf.putShort(AgentServer.getServerId());
572         buf.putInt(getBootTS());
573         buf.flip();
574         channel.write(buf);
575
576 // ByteBuffer buf = ByteBuffer.allocate(6);
577
buf.flip();
578         if (channel.read(buf) > 0) {
579           // Reads the message length
580
buf.flip();
581           int boot = buf.getInt();
582
583           AgentServer.getTransaction().begin();
584           testBootTS(sid, boot);
585           AgentServer.getTransaction().commit();
586           AgentServer.getTransaction().release();
587         } else {
588           throw new ConnectException("Can't get status");
589         }
590       } catch (Exception JavaDoc exc) {
591         if (logmon.isLoggable(BasicLevel.WARN))
592           logmon.log(BasicLevel.WARN,
593                      getName() + ", connection refused.", exc);
594     // TODO: Try it later, may be a a connection is in progress...
595
try {
596       channel.close();
597     } catch (Exception JavaDoc exc2) {}
598
599     // Reset the local attribute to allow future attempts.
600
this.local = false;
601     return false;
602       }
603
604       // Normally, only one thread can reach this code (*1), so we don't have
605
// to synchronized theses statements. First sets sock attribute, then
606
// releases the local lock.
607
// (*1) all local attempts are blocked and the remote side has already
608
// setup the connection (ACK reply).
609
this.channel = channel;
610       this.local = false;
611
612       return true;
613     }
614
615     /**
616      * Its method is called by <a HREF="start(java.net.Socket,
617      * java.io.ObjectInputStream, java.io.ObjectOutputStream">start</a>
618      * in order to reply to a connection request from a remote server.
619      * The corresponding code on remote server is the method
620      * <a HREF="#localStart()">localStart</a>.
621      *
622      * @param sock the connected socket
623      * @param ois the input stream
624      * @param oos the output stream
625      *
626      * @return true if the connection is established, false otherwise.
627      */

628     synchronized boolean remoteStart(SocketChannel channel, int boot) {
629       try {
630     if ((this.channel != null) ||
631         (this.local && server.sid > AgentServer.getServerId())) {
632       // The connection is already established, or
633
// a "local" connection is in progress from this server with a
634
// greater priority.
635
// In all cases, stops this "remote" attempt.
636
// If the "local" attempt has a lower priority, it will fail
637
// due to a remote reject.
638
throw new ConnectException("Already connected");
639         }
640
641     // Accept this connection.
642
if (logmon.isLoggable(BasicLevel.DEBUG))
643           logmon.log(BasicLevel.DEBUG,
644                      getName() + ", writeBoot: " + getBootTS());
645
646         ByteBuffer buf = ByteBuffer.allocate(4);
647         buf.putInt(getBootTS());
648         buf.flip();
649         channel.write(buf);
650
651         AgentServer.getTransaction().begin();
652         testBootTS(sid, boot);
653         AgentServer.getTransaction().commit();
654         AgentServer.getTransaction().release();
655
656     // Fixing sock attribute will prevent any future attempt
657
this.channel = channel;
658     return true;
659       } catch (Exception JavaDoc exc) {
660     // May be a a connection is in progress, try it later...
661
if (logmon.isLoggable(BasicLevel.WARN))
662           logmon.log(BasicLevel.WARN,
663                          getName() + ", connection refused", exc);
664
665     // Close the connection (# NACK).
666
try {
667       channel.close();
668     } catch (Exception JavaDoc exc2) {}
669       }
670       return false;
671     }
672
673     /**
674      * The session is well initialized, we can start the server thread that
675      * "listen" the connected socket. If the maximum number of connections
676      * is reached, one connection from the pool is closed.
677      */

678     private void startEnd() throws IOException {
679       server.active = true;
680       server.retry = 0;
681
682 // mos = new MessageOutputStream();
683
nbwrite = 0;
684 // bufin = ByteBuffer.allocateDirect(SO_BUFSIZE);
685
bufin.clear();
686 // mis = new MessageInputStream();
687

688       // The returned channel is in blocking mode.
689
channel.configureBlocking(false);
690       // Register channels with selector
691
channel.register(selector, channel.validOps(), this);
692
693       if (logmon.isLoggable(BasicLevel.DEBUG))
694         logmon.log(BasicLevel.DEBUG,
695                          getName() + ", connection started");
696
697       sendlist.reset();
698     }
699
700     synchronized void send(Message msg) throws IOException {
701       if (logmon.isLoggable(BasicLevel.DEBUG))
702         logmon.log(BasicLevel.DEBUG,
703                          getName() + ", send message: " + msg);
704
705       // Adds it to the list of message to be sent, it will be removed
706
// after its ack receipt.
707
sendlist.addMessage(msg);
708
709       if ((channel != null) && (bufout == null)) {
710         // As no message are actually sending the channel is only subscribe
711
// for reading, subscribe this channel for write operation will permit
712
// to transmit the new added message.
713

714     // Be careful, as this change is only take in account for the next
715
// select operation, we have to use wakeup on selector
716

717         SelectionKey key = channel.keyFor(selector);
718         if (logmon.isLoggable(BasicLevel.DEBUG))
719           logmon.log(BasicLevel.DEBUG,
720                          getName() + ", send message, key=" + key);
721         if (key != null)
722           key.interestOps(channel.validOps());
723       }
724
725       // In all case a selector.wakeup() will solve the problem !!
726
if (selector == null) {
727         logmon.log(BasicLevel.WARN,
728                    getName() + ", network not started.");
729       } else {
730         selector.wakeup();
731       }
732     }
733
734     /**
735      * Class used to send messages through a stream.
736      */

737     final class MessageOutputStream extends ByteArrayOutputStream {
738       private ObjectOutputStream oos = null;
739
740       MessageOutputStream() throws IOException {
741         super(256);
742
743         oos = new ObjectOutputStream(this);
744
745         count = 0;
746         buf[29] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF);
747         buf[30] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF);
748         buf[31] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF);
749         buf[32] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF);
750       }
751
752       void writeMessage(Message msg) throws IOException {
753         logmon.log(BasicLevel.DEBUG, getName() + ", writes " + msg);
754
755         // Writes sender's AgentId
756
buf[4] = (byte) (msg.from.from >>> 8);
757         buf[5] = (byte) (msg.from.from >>> 0);
758         buf[6] = (byte) (msg.from.to >>> 8);
759         buf[7] = (byte) (msg.from.to >>> 0);
760         buf[8] = (byte) (msg.from.stamp >>> 24);
761         buf[9] = (byte) (msg.from.stamp >>> 16);
762         buf[10] = (byte) (msg.from.stamp >>> 8);
763         buf[11] = (byte) (msg.from.stamp >>> 0);
764         // Writes adressee's AgentId
765
buf[12] = (byte) (msg.to.from >>> 8);
766         buf[13] = (byte) (msg.to.from >>> 0);
767         buf[14] = (byte) (msg.to.to >>> 8);
768         buf[15] = (byte) (msg.to.to >>> 0);
769         buf[16] = (byte) (msg.to.stamp >>> 24);
770         buf[17] = (byte) (msg.to.stamp >>> 16);
771         buf[18] = (byte) (msg.to.stamp >>> 8);
772         buf[19] = (byte) (msg.to.stamp >>> 0);
773         // Writes source server id of message
774
buf[20] = (byte) (msg.source >>> 8);
775         buf[21] = (byte) (msg.source >>> 0);
776         // Writes destination server id of message
777
buf[22] = (byte) (msg.dest >>> 8);
778         buf[23] = (byte) (msg.dest >>> 0);
779         // Writes stamp of message
780
buf[24] = (byte) (msg.stamp >>> 24);
781         buf[25] = (byte) (msg.stamp >>> 16);
782         buf[26] = (byte) (msg.stamp >>> 8);
783         buf[27] = (byte) (msg.stamp >>> 0);
784         count = 28;
785
786         if (msg.not != null) {
787           // Writes notification attributes
788
buf[28] = (byte) ((msg.not.persistent?0x01:0) |
789                             (msg.not.detachable?0x10:0));
790           // Be careful, the stream header is hard-written in buf[29..32]
791
count = 33;
792
793           oos.writeObject(msg.not);
794           oos.reset();
795           oos.flush();
796         }
797
798         // Writes length at beginning
799
buf[0] = (byte) (count >>> 24);
800         buf[1] = (byte) (count >>> 16);
801         buf[2] = (byte) (count >>> 8);
802         buf[3] = (byte) (count >>> 0);
803         
804         logmon.log(BasicLevel.DEBUG, getName() + ", writes " + count);
805
806         nbwrite = count;
807         bufout = ByteBuffer.wrap(buf, 0, count);
808         nbwrite -= channel.write(bufout);
809       }
810     }
811
812 // void write(Message msg) throws IOException {
813
// mos.writeMessage(msg);
814
// }
815

816     /**
817      * Method called each time the channel is Writable
818      */

819     private synchronized void write() throws IOException {
820       if (logmon.isLoggable(BasicLevel.DEBUG))
821         logmon.log(BasicLevel.DEBUG, getName() + " write-1");
822
823       // test if there is still bytes to write
824
if ((bufout != null) && (nbwrite > 0)) {
825         if (logmon.isLoggable(BasicLevel.DEBUG))
826           logmon.log(BasicLevel.DEBUG, getName() + " write-2");
827         nbwrite -= channel.write(bufout);
828       } else {
829         if (nbwrite == 0) {
830           if (logmon.isLoggable(BasicLevel.DEBUG))
831             logmon.log(BasicLevel.DEBUG, getName() + " write-3");
832 // if (bufout != null) {
833
// // end of message sending, if it is an acknowledge remove it
834
// // from sendlist else wait for ack.
835
// if (sendlist.currentMessage().not == null) {
836
// logmon.log(BasicLevel.DEBUG, getName() + " remove ack sent");
837
// sendlist.removeCurrent();
838
// }
839
// }
840
Message msg = sendlist.nextMessage();
841           if (msg == null) {
842             bufout = null;
843             // There is no more message to send, unsubscribe this channel
844
// for write operation.
845
if (logmon.isLoggable(BasicLevel.DEBUG))
846               logmon.log(BasicLevel.DEBUG, getName() + " write-4x:" + msg);
847             channel.register(selector, SelectionKey.OP_READ, this);
848           } else {
849             if (logmon.isLoggable(BasicLevel.DEBUG))
850               logmon.log(BasicLevel.DEBUG, getName() + " write-4:" + msg);
851             mos.writeMessage(msg);
852             if (msg.not == null) {
853               logmon.log(BasicLevel.DEBUG, getName() + " remove ack sent");
854               sendlist.removeCurrent();
855             }
856           }
857         }
858       }
859     }
860
861     /**
862      * Method called each time the channel is Readable
863      */

864     private synchronized void read() throws Exception JavaDoc {
865       int bytes = channel.read(bufin);
866
867       if (logmon.isLoggable(BasicLevel.DEBUG))
868         logmon.log(BasicLevel.DEBUG, getName() + " reads: " + bytes);
869
870       if (bytes == 0) return;
871
872       if (bytes < 0) {
873         if (logmon.isLoggable(BasicLevel.DEBUG))
874           logmon.log(BasicLevel.DEBUG, getName() + " cnx remotely closed");
875     close();
876     return;
877       }
878
879       bufin.flip();
880       while (bytes > 0) {
881 // logmon.log(BasicLevel.FATAL,
882
// "mis.getBuffer()=" + mis.getBuffer().length);
883
// logmon.log(BasicLevel.FATAL,
884
// "mis.getCount()=" + mis.getCount());
885
// logmon.log(BasicLevel.FATAL,
886
// "mis.length=" + mis.length);
887
// logmon.log(BasicLevel.FATAL,
888
// "bytes=" + bytes);
889

890     if (mis.length == -1) {
891 // if (mis.msg == null) {
892
// Reads the message header.
893
if ((mis.getCount() + bytes) < 28) {
894             bufin.get(mis.getBuffer(), mis.getCount(), bytes);
895         mis.setCount(mis.getCount() + bytes);
896         bytes = 0;
897       } else {
898         bufin.get(mis.getBuffer(), mis.getCount(), 28-mis.getCount());
899         bytes -= 28-mis.getCount();
900         mis.setCount(28);
901
902             Message msg = mis.readHeader();
903
904             if (mis.length == 28) {
905               if (logmon.isLoggable(BasicLevel.DEBUG))
906                 logmon.log(BasicLevel.DEBUG,
907                            getName() + ", ack received #" + msg.stamp);
908 // logmon.log(BasicLevel.FATAL, msg.toString());
909
doAck(msg.stamp);
910               msg.free();
911
912               // Reset data structures for next messages
913
mis.length = -1;
914               mis.msg = null;
915               mis.setCount(0);
916             }
917
918 // // Reads the message length
919
// try {
920
// for (; nbread <28; nbread++) {
921
// bufin.get(mis.buf, nbread, 28-nbread);
922
// bytes -= 1;
923
// }
924
// if (logmon.isLoggable(BasicLevel.DEBUG))
925
// logmon.log(BasicLevel.DEBUG, getName() + " get length: " + length);
926
// // Allocates byte array for message storage
927
// array = new byte[length];
928
// nbread = 0;
929
// } catch (BufferUnderflowException exc) {
930
// break;
931
}
932     } else {
933           // The message header is read, reads the notification if any.
934
if ((mis.getCount() + bytes) < (mis.length-28)) {
935         bufin.get(mis.getBuffer(), mis.getCount(), bytes);
936         mis.setCount(mis.getCount() + bytes);
937         bytes = 0;
938       } else {
939         bufin.get(mis.getBuffer(), mis.getCount(), mis.length-28-mis.getCount());
940         bytes -= mis.length-28-mis.getCount();
941         mis.setCount(mis.length-28);
942             
943             Message msg = mis.readMessage();
944
945             // Keep message stamp in order to acknowledge it (be careful,
946
// the message get a new stamp to be delivered).
947
int stamp = msg.getStamp();
948             if (logmon.isLoggable(BasicLevel.DEBUG))
949               logmon.log(BasicLevel.DEBUG,
950                          getName() + ", message received #" + stamp);
951 // logmon.log(BasicLevel.FATAL, msg.toString());
952
deliver(msg);
953             ack(stamp);
954               
955         // Reset data structures for next messages
956
mis.length = -1;
957             mis.msg = null;
958         mis.setCount(0);
959       }
960     }
961       }
962       bufin.clear();
963     }
964
965     /**
966      * Class used to read messages through a stream.
967      */

968     final class MessageInputStream extends ByteArrayInputStream {
969       int length = -1;
970       Message msg = null;
971
972       MessageInputStream() {
973         super(new byte[512]);
974         count = 0;
975       }
976
977       public void reset() {
978         super.reset();
979         length = -1;
980         msg = null;
981       }
982
983       byte[] getBuffer() {
984         return buf;
985       }
986
987       int getCount() {
988         return count;
989       }
990
991       void setCount(int count) {
992         this.count = count;
993       }
994
995       Message readHeader() throws Exception JavaDoc {
996         // Reads boot timestamp of source server
997
length = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) +
998           ((buf[2] & 0xFF) << 8) + ((buf[3] & 0xFF) << 0);
999
1000        msg = Message.alloc();
1001        // Reads sender's AgentId
1002
msg.from = new AgentId(
1003          (short) (((buf[4] & 0xFF) << 8) + (buf[5] & 0xFF)),
1004          (short) (((buf[6] & 0xFF) << 8) + (buf[7] & 0xFF)),
1005          ((buf[8] & 0xFF) << 24) + ((buf[9] & 0xFF) << 16) +
1006          ((buf[10] & 0xFF) << 8) + ((buf[11] & 0xFF) << 0));
1007        // Reads adressee's AgentId
1008
msg.to = new AgentId(
1009          (short) (((buf[12] & 0xFF) << 8) + (buf[13] & 0xFF)),
1010          (short) (((buf[14] & 0xFF) << 8) + (buf[15] & 0xFF)),
1011          ((buf[16] & 0xFF) << 24) + ((buf[17] & 0xFF) << 16) +
1012          ((buf[18] & 0xFF) << 8) + ((buf[19] & 0xFF) << 0));
1013        // Reads source server id of message
1014
msg.source = (short) (((buf[20] & 0xFF) << 8) +
1015                              ((buf[21] & 0xFF) << 0));
1016        // Reads destination server id of message
1017
msg.dest = (short) (((buf[22] & 0xFF) << 8) +
1018                            ((buf[23] & 0xFF) << 0));
1019        // Reads stamp of message
1020
msg.stamp = ((buf[24] & 0xFF) << 24) +
1021          ((buf[25] & 0xFF) << 16) +
1022          ((buf[26] & 0xFF) << 8) +
1023          ((buf[27] & 0xFF) << 0);
1024
1025        if ((length -28) > buf.length)
1026          buf = new byte[length -28];
1027
1028        count = 0;
1029
1030        return msg;
1031      }
1032
1033      Message readMessage() throws Exception JavaDoc {
1034        if (length > 28) {
1035          // Reads notification attributes
1036
boolean persistent = ((buf[28] & 0x01) == 0x01) ? true : false;
1037          boolean detachable = ((buf[28] & 0x10) == 0x10) ? true : false;
1038          pos = 1;
1039          // Reads notification object
1040
ObjectInputStream ois = new ObjectInputStream(this);
1041          msg.not = (Notification) ois.readObject();
1042          msg.not.persistent = persistent;
1043          msg.not.detachable = detachable;
1044          msg.not.detached = false;
1045        } else {
1046          msg.not = null;
1047        }
1048
1049        return msg;
1050      }
1051    }
1052
1053    /**
1054     * Removes all messages in sendList previous to the ack'ed one.
1055     * Be careful, messages in sendList are not always in stamp order.
1056     * Its method should not be synchronized, it scans the list from
1057     * begin to end, and it removes always the first element. Other
1058     * methods using sendList just adds element at the end.
1059     */

1060    final private void doAck(int ack) throws IOException {
1061      Message msg = null;
1062      try {
1063        // Suppress the acknowledged notification from waiting list,
1064
// and deletes it.
1065
msg = sendlist.removeMessage(ack);
1066        AgentServer.getTransaction().begin();
1067        msg.delete();
1068        msg.free();
1069        AgentServer.getTransaction().commit();
1070        AgentServer.getTransaction().release();
1071
1072        if (logmon.isLoggable(BasicLevel.DEBUG))
1073          logmon.log(BasicLevel.DEBUG,
1074                     getName() + ", remove msg#" + msg.getStamp());
1075      } catch (NoSuchElementException exc) {
1076        logmon.log(BasicLevel.WARN,
1077                   getName() + ", can't ack, unknown msg#" + ack);
1078      }
1079    }
1080
1081    final private void ack(int stamp) throws Exception JavaDoc {
1082      if (logmon.isLoggable(BasicLevel.DEBUG))
1083          logmon.log(BasicLevel.DEBUG,
1084                     getName() + ", set ack msg#" + stamp);
1085
1086      Message ack = Message.alloc(AgentId.localId,
1087                                  AgentId.localId(server.sid),
1088                                  null);
1089      ack.source = AgentServer.getServerId();
1090      ack.dest = AgentServer.getServerDesc(server.sid).gateway;
1091      ack.stamp = stamp;
1092
1093      send(ack);
1094    }
1095
1096    void close() throws IOException {
1097      if (logmon.isLoggable(BasicLevel.DEBUG))
1098          logmon.log(BasicLevel.DEBUG, getName() + ", close");
1099      try {
1100        channel.keyFor(selector).cancel();
1101      } catch (Exception JavaDoc exc) {}
1102      try {
1103        channel.close();
1104      } catch (Exception JavaDoc exc) {
1105        //
1106
} finally {
1107        channel = null;
1108      }
1109      nbwrite = 0;
1110      bufout = null;
1111    }
1112
1113    public String JavaDoc toString() {
1114      StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
1115      strbuf.append('(').append(super.toString());
1116      strbuf.append(',').append(name);
1117      strbuf.append(',').append(channel);
1118      strbuf.append(',').append(nbwrite);
1119      strbuf.append(',').append(sendlist).append(')');
1120      return strbuf.toString();
1121    }
1122  }
1123
1124  final class MessageVector {
1125    /**
1126     * The array buffer into which the components of the vector are
1127     * stored. The capacity of the vector is the length of this array buffer,
1128     * and is at least large enough to contain all the vector's elements.<p>
1129     *
1130     * Any array elements following the last element in the Vector are null.
1131     */

1132    private Message elementData[] = null;
1133
1134    /**
1135     * The number of valid components in this <tt>MessageVector</tt> object.
1136     */

1137    private int elementCount = 0;
1138
1139    /**
1140     * The actual item in this <tt>MessageVector</tt> object.
1141     */

1142    private int current = -1;
1143
1144    /**
1145     * Constructs an empty vector with the specified initial capacity and
1146     * capacity increment.
1147     */

1148    public MessageVector() {
1149    this.elementData = new Message[20];
1150    }
1151
1152    public synchronized Message nextMessage() {
1153      logmon.log(BasicLevel.FATAL, getName() + ", nextMessage:" + toString());
1154
1155      if ((current +1) < elementCount)
1156        return elementData[++current];
1157      else
1158        return null;
1159    }
1160
1161    /**
1162     * Returns the number of message in this vector.
1163     *
1164     * @return the number of message in this vector.
1165     */

1166    public synchronized int size() {
1167      return elementCount;
1168    }
1169
1170    public synchronized void reset() {
1171      current = -1;
1172    }
1173
1174    /**
1175     * Adds the specified component to the end of this vector,
1176     * increasing its size by one. The capacity of this vector is
1177     * increased if its size becomes greater than its capacity. <p>
1178     *
1179     * @param msg the component to be added.
1180     */

1181    public synchronized void addMessage(Message msg) {
1182      logmon.log(BasicLevel.FATAL, getName() + ", addMessage:" + toString());
1183
1184      if ((elementCount + 1) > elementData.length) {
1185        Message oldData[] = elementData;
1186        elementData = new Message[elementData.length * 2];
1187        System.arraycopy(oldData, 0, elementData, 0, elementCount);
1188      }
1189      elementData[elementCount++] = msg;
1190    }
1191
1192    public synchronized void removeCurrent() {
1193      logmon.log(BasicLevel.FATAL, getName() + ", removeCurrent:" + toString());
1194
1195      if (elementCount > (current +1)) {
1196        System.arraycopy(elementData, current +1,
1197                         elementData, current, elementCount - current -1);
1198        
1199      }
1200      elementData[elementCount-1] = null; /* to let gc do its work */
1201      elementCount--;
1202      current--;
1203    }
1204
1205    /**
1206     * Removes a message specified by its stamp. Only remove real message,
1207     * this method don't touch to acknowledge message.
1208     *
1209     * @param stamp the stamp of the message to remove.
1210     */

1211    public synchronized Message removeMessage(int stamp) {
1212      Message msg = null;
1213
1214      logmon.log(BasicLevel.FATAL, getName() + ", removeMessage:" + toString());
1215
1216      for (int index=0 ; index<elementCount ; index++) {
1217        msg = elementData[index];
1218
1219        if ((msg.not != null) && (msg.getStamp() == stamp)) {
1220          if (elementCount > (index +1)) {
1221        System.arraycopy(elementData, index +1,
1222                             elementData, index, elementCount - index - 1);
1223          }
1224          elementData[elementCount-1] = null; /* to let gc do its work */
1225          elementCount--;
1226          // AF: To verify !!
1227
if (index <=current) current--;
1228        
1229          return msg;
1230        }
1231      }
1232      throw new NoSuchElementException();
1233    }
1234
1235    public String JavaDoc toString() {
1236      StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
1237      strbuf.append(super.toString());
1238      strbuf.append(',').append(current);
1239      strbuf.append(',').append(elementCount);
1240      for (int i=0; i<elementCount; i++) {
1241        strbuf.append(",(").append(elementData[i]).append(')');
1242      }
1243      return strbuf.toString();
1244    }
1245  }
1246}
1247
Popular Tags