KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > SimpleNetwork


1 /*
2  * Copyright (C) 2003 - 2006 ScalAgent Distributed Technologies
3  * Copyright (C) 2004 - France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): ScalAgent Distributed Technologies
21  * Contributor(s):
22  */

23 package fr.dyade.aaa.agent;
24
25 import java.io.*;
26 import java.net.*;
27 import java.util.Vector JavaDoc;
28 import java.util.Enumeration JavaDoc;
29
30 import org.objectweb.util.monolog.api.BasicLevel;
31 import org.objectweb.util.monolog.api.Logger;
32
33 import fr.dyade.aaa.util.*;
34
35 /**
36  * <code>SimpleNetwork</code> is a simple implementation of
37  * <code>StreamNetwork</code> class with a single connection at
38  * a time.
39  */

40 public class SimpleNetwork extends StreamNetwork {
41   /** FIFO list of all messages to be sent by the watch-dog thead. */
42   MessageVector sendList;
43
44   private JGroups jgroups = null;
45
46   public void setJGroups(JGroups jgroups) {
47     this.jgroups = jgroups;
48   }
49   
50   void ackMsg(JGroupsAckMsg ack) {
51     try {
52       AgentServer.getTransaction().begin();
53       // Deletes the processed notification
54
qout.remove(ack.getStamp());
55       ack.delete();
56       AgentServer.getTransaction().commit();
57       AgentServer.getTransaction().release();
58       if (this.logmon.isLoggable(BasicLevel.DEBUG))
59         this.logmon.log(BasicLevel.DEBUG,
60                         this.getName() + ", ackMsg(...) done.");
61     } catch (Exception JavaDoc exc) {
62       this.logmon.log(BasicLevel.FATAL,
63                       this.getName() + ", ackMsg unrecoverable exception",
64                       exc);
65     }
66   }
67
68   /**
69    * Creates a new network component.
70    */

71   public SimpleNetwork() {
72     super();
73   }
74
75   /** Input component */
76   NetServerIn netServerIn = null;
77   /** Output component */
78   NetServerOut netServerOut = null;
79
80   /**
81    * Causes this network component to begin execution.
82    */

83   public void start() throws IOException {
84     logmon.log(BasicLevel.DEBUG, getName() + ", starting");
85     try {
86       if (sendList == null)
87         sendList = new MessageVector(getName(),
88                                      AgentServer.getTransaction().isPersistent());
89     
90       if (netServerIn == null)
91         netServerIn = new NetServerIn(getName(), logmon);
92       if (netServerOut == null)
93         netServerOut = new NetServerOut(getName(), logmon);
94
95       if (! netServerIn.isRunning()) netServerIn.start();
96       if (! netServerOut.isRunning()) netServerOut.start();
97     } catch (IOException exc) {
98       logmon.log(BasicLevel.ERROR, getName() + ", can't start", exc);
99       throw exc;
100     }
101     logmon.log(BasicLevel.DEBUG, getName() + ", started");
102   }
103
104   /**
105    * Forces the network component to stop executing.
106    */

107   public void stop() {
108     if (netServerIn != null) netServerIn.stop();
109     if (netServerOut != null) netServerOut.stop();
110     logmon.log(BasicLevel.DEBUG, getName() + ", stopped");
111   }
112
113   /**
114    * Tests if the network component is alive.
115    *
116    * @return true if this <code>MessageConsumer</code> is alive; false
117    * otherwise.
118    */

119   public boolean isRunning() {
120     if ((netServerIn != null) && netServerIn.isRunning() &&
121     (netServerOut != null) && netServerOut.isRunning())
122       return true;
123     else
124       return false;
125   }
126
127   /**
128    * Returns a string representation of this consumer, including the
129    * daemon's name and status.
130    *
131    * @return A string representation of this consumer.
132    */

133   public String JavaDoc toString() {
134     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
135
136     strbuf.append(super.toString()).append("\n\t");
137     if (netServerIn != null)
138       strbuf.append(netServerIn.toString()).append("\n\t");
139     if (netServerOut != null)
140       strbuf.append(netServerOut.toString()).append("\n\t");
141
142     return strbuf.toString();
143   }
144
145 // /**
146
// * Use to clean the qout of all messages to the dead node.
147
// *
148
// * @param dead - the unique id. of dead server.
149
// */
150
// void clean(short dead) {
151
// Message msg = null;
152
// // TODO: Be careful, to the route algorithm!
153
// synchronized (lock) {
154
// for (int i=0; i<qout.size(); i++) {
155
// msg = (Message) qout.getMessageAt(i);
156
// if (msg.to.to == dead) {
157
// qout.removeMessageAt(i);
158
// }
159
// }
160
// }
161
// }
162

163   final class NetServerOut extends Daemon {
164     MessageOutputStream nos = null;
165
166     NetServerOut(String JavaDoc name, Logger logmon) {
167       super(name + ".NetServerOut");
168       // Overload logmon definition in Daemon
169
this.logmon = logmon;
170       this.setThreadGroup(AgentServer.getThreadGroup());
171     }
172
173     protected void close() {}
174
175     protected void shutdown() {}
176
177     public void run() {
178       int ret;
179       Message msg = null;
180       short msgto;
181       ServerDesc server = null;
182       InputStream is = null;
183
184       try {
185         try {
186           nos = new MessageOutputStream();
187         } catch (IOException exc) {
188           logmon.log(BasicLevel.FATAL,
189                      getName() + ", cannot start.");
190           return;
191         }
192
193     while (running) {
194           canStop = true;
195           try {
196             if (this.logmon.isLoggable(BasicLevel.DEBUG))
197               this.logmon.log(BasicLevel.DEBUG,
198                               this.getName() + ", waiting message");
199             msg = qout.get(WDActivationPeriod);
200           } catch (InterruptedException JavaDoc exc) {
201             if (this.logmon.isLoggable(BasicLevel.DEBUG))
202               this.logmon.log(BasicLevel.DEBUG,
203                               this.getName() + ", interrupted");
204             continue;
205           }
206           canStop = false;
207           if (! running) break;
208
209           long currentTimeMillis = System.currentTimeMillis();
210           // Try to send waiting messages
211
watchdog(currentTimeMillis);
212
213           if (msg != null) {
214             msgto = msg.getDest();
215             
216             Socket socket = null;
217             try {
218               if (this.logmon.isLoggable(BasicLevel.DEBUG))
219                 this.logmon.log(BasicLevel.DEBUG,
220                                 this.getName() + ", try to send message -> " +
221                                 msg + "/" + msgto);
222
223               if ((msg.not.expiration > 0) &&
224                   (msg.not.expiration < currentTimeMillis)) {
225                 throw new ExpirationExceededException();
226               }
227               
228               // Can throw an UnknownServerException...
229
server = AgentServer.getServerDesc(msgto);
230               try {
231                 if ((! server.active) ||
232                     (server.last > currentTimeMillis)) {
233                   if (this.logmon.isLoggable(BasicLevel.DEBUG))
234                     this.logmon.log(BasicLevel.DEBUG,
235                                     this.getName() + ", AgentServer#" + msgto + " is down");
236                   throw new ConnectException("AgentServer#" + msgto + " is down");
237                 }
238                 
239                 // Open the connection.
240
try {
241                   if (this.logmon.isLoggable(BasicLevel.DEBUG))
242                     this.logmon.log(BasicLevel.DEBUG, this.getName() + ", try to connect");
243
244                   for (Enumeration JavaDoc e = server.getSockAddrs(); e.hasMoreElements();) {
245                     fr.dyade.aaa.util.SocketAddress sa =
246                       (fr.dyade.aaa.util.SocketAddress) e.nextElement();
247                     try {
248                       server.moveToFirst(sa);
249                       socket = createSocket(server);
250                     } catch (IOException ioexc) {
251                       this.logmon.log(BasicLevel.DEBUG,
252                                       this.getName() + ", connection refused with addr=" + server.getAddr()+
253                                       " port=" + server.getPort() +", try next element");
254                       continue;
255                     }
256                     if (this.logmon.isLoggable(BasicLevel.DEBUG))
257                       this.logmon.log(BasicLevel.DEBUG, this.getName() + ", connected");
258                     break;
259                   }
260                   
261                   if (socket == null)
262                     socket = createSocket(server);
263                 } catch (IOException exc) {
264                   this.logmon.log(BasicLevel.WARN,
265                                   this.getName() + ", connection refused", exc);
266                   server.active = false;
267                   server.last = System.currentTimeMillis();
268                   server.retry += 1;
269                   throw exc;
270                 }
271                 setSocketOption(socket);
272               } catch (IOException exc) {
273                 this.logmon.log(BasicLevel.WARN,
274                                 this.getName() + ", move msg in watchdog list", exc);
275                 // There is a connection problem, put the message in a
276
// waiting list.
277
sendList.addMessage(msg);
278                 qout.pop();
279                 continue;
280               }
281               
282               try {
283                 send(socket, msg, currentTimeMillis);
284               } catch (IOException exc) {
285                 this.logmon.log(BasicLevel.WARN,
286                                 this.getName() + ", move msg in watchdog list", exc);
287                 // There is a problem during network transaction, put the
288
// message in waiting list in order to retry later.
289
sendList.addMessage(msg);
290                 qout.pop();
291                 continue;
292               }
293             } catch (UnknownServerException exc) {
294               this.logmon.log(BasicLevel.ERROR,
295                               this.getName() + ", can't send message: " + msg,
296                               exc);
297               // Remove the message (see below), may be we have to post an
298
// error notification to sender.
299
} catch (ExpirationExceededException exc) {
300               if (logmon.isLoggable(BasicLevel.DEBUG))
301                 logmon.log(BasicLevel.DEBUG,
302                            getName() + ": removes expired notification " +
303                            msg.from + ", " + msg.not);
304             }
305
306             AgentServer.getTransaction().begin();
307             // Suppress the processed notification from message queue,
308
// and deletes it.
309
qout.pop();
310             // send ack in JGroups to delete msg
311
if (jgroups != null)
312               jgroups.send(new JGroupsAckMsg(msg));
313             msg.delete();
314             msg.free();
315             AgentServer.getTransaction().commit();
316             AgentServer.getTransaction().release();
317           }
318         }
319       } catch (Exception JavaDoc exc) {
320         this.logmon.log(BasicLevel.FATAL,
321                         this.getName() + ", unrecoverable exception", exc);
322         // There is an unrecoverable exception during the transaction
323
// we must exit from server.
324
AgentServer.stop(false);
325       } finally {
326         finish();
327       }
328     }
329
330 // /** The date of the last watchdog execution. */
331
// private long last = 0L;
332

333     /*
334      *
335      * @exception IOException unrecoverable exception during transaction.
336      */

337     void watchdog(long currentTimeMillis) throws IOException {
338 // this.logmon.log(BasicLevel.DEBUG,
339
// this.getName() + " watchdog().");
340

341 // if (currentTimeMillis < (last + WDActivationPeriod))
342
// return;
343
// last = currentTimeMillis;
344

345       ServerDesc server = null;
346
347       for (int i=0; i<sendList.size(); i++) {
348         Message msg = (Message) sendList.getMessageAt(i);
349         short msgto = msg.getDest();
350
351         if (this.logmon.isLoggable(BasicLevel.DEBUG))
352           this.logmon.log(BasicLevel.DEBUG,
353                           this.getName() +
354                           ", check msg#" + msg.getStamp() +
355                           " from " + msg.from +
356                           " to " + msg.to);
357
358         if ((msg.not.expiration > 0) &&
359             (msg.not.expiration < currentTimeMillis)) {
360           if (logmon.isLoggable(BasicLevel.DEBUG))
361             logmon.log(BasicLevel.DEBUG,
362                        getName() + ": removes expired notification " +
363                        msg.from + ", " + msg.not);
364
365           // Remove the message.
366
AgentServer.getTransaction().begin();
367           // Deletes the processed notification
368
sendList.removeMessageAt(i); i--;
369 // AF: A reprendre.
370
// // send ack in JGroups to delete msg
371
// if (jgroups != null)
372
// jgroups.send(new JGroupsAckMsg(msg));
373
msg.delete();
374           msg.free();
375           AgentServer.getTransaction().commit();
376           AgentServer.getTransaction().release();
377         }
378
379         try {
380           server = AgentServer.getServerDesc(msgto);
381         } catch (UnknownServerException exc) {
382           this.logmon.log(BasicLevel.ERROR,
383                           this.getName() + ", can't send message: " + msg,
384                           exc);
385           // Remove the message, may be we have to post an error
386
// notification to sender.
387
AgentServer.getTransaction().begin();
388           // Deletes the processed notification
389
sendList.removeMessageAt(i); i--;
390 // AF: A reprendre.
391
// // send ack in JGroups to delete msg
392
// if (jgroups != null)
393
// jgroups.send(new JGroupsAckMsg(msg));
394
msg.delete();
395           msg.free();
396           AgentServer.getTransaction().commit();
397           AgentServer.getTransaction().release();
398
399           continue;
400         }
401
402         if (server.last > currentTimeMillis) {
403           // The server has already been tested during this round
404
continue;
405         }
406
407         if ((server.active) ||
408             ((server.retry < WDNbRetryLevel1) &&
409              ((server.last + WDRetryPeriod1) < currentTimeMillis)) ||
410             ((server.retry < WDNbRetryLevel2) &&
411              ((server.last + WDRetryPeriod2) < currentTimeMillis)) ||
412             ((server.last + WDRetryPeriod3) < currentTimeMillis)) {
413           try {
414             if (this.logmon.isLoggable(BasicLevel.DEBUG))
415               this.logmon.log(BasicLevel.DEBUG,
416                               this.getName() +
417                               ", send msg#" + msg.getStamp());
418
419             // Open the connection.
420
Socket socket = createSocket(server);
421             // The connection is ok, reset active and retry flags.
422
server.active = true;
423             server.retry = 0;
424             // Reset last in order to allow sending of following messages
425
// to the same server.
426
server.last = currentTimeMillis;
427
428             setSocketOption(socket);
429
430             send(socket, msg, currentTimeMillis);
431           } catch (SocketException exc) {
432             if (this.logmon.isLoggable(BasicLevel.WARN))
433               this.logmon.log(BasicLevel.WARN,
434                               this.getName() + ", let msg in watchdog list",
435                               exc);
436             server.active = false;
437             server.retry += 1;
438             // Set last in order to avoid the sending of following messages to
439
// same server.
440
server.last = currentTimeMillis +1;
441             // There is a connection problem, let the message in the
442
// waiting list.
443
continue;
444           } catch (Exception JavaDoc exc) {
445             this.logmon.log(BasicLevel.ERROR,
446                             this.getName() + ", error", exc);
447           }
448
449           AgentServer.getTransaction().begin();
450           // Deletes the processed notification
451
sendList.removeMessageAt(i); i--;
452 // AF: A reprendre.
453
// // send ack in JGroups to delete msg
454
// if (jgroups != null)
455
// jgroups.send(new JGroupsAckMsg(msg));
456
msg.delete();
457           msg.free();
458           AgentServer.getTransaction().commit();
459           AgentServer.getTransaction().release();
460         } else {
461           // Set last in order to avoid the sending of following messages to
462
// same server.
463
server.last = currentTimeMillis +1;
464         }
465       }
466     }
467
468     public void send(Socket socket,
469                      Message msg,
470                      long currentTimeMillis) throws IOException {
471       int ret;
472       InputStream is = null;
473
474       try {
475         // Send the message,
476
if (this.logmon.isLoggable(BasicLevel.DEBUG))
477           this.logmon.log(BasicLevel.DEBUG,
478                           this.getName() + ", write message");
479         nos.writeMessage(socket, msg, currentTimeMillis);
480         // and wait the acknowledge.
481
if (this.logmon.isLoggable(BasicLevel.DEBUG))
482           this.logmon.log(BasicLevel.DEBUG,
483                           this.getName() + ", wait ack");
484         is = socket.getInputStream();
485         if ((ret = is.read()) == -1)
486           throw new ConnectException("Connection broken");
487
488         if (this.logmon.isLoggable(BasicLevel.DEBUG))
489           this.logmon.log(BasicLevel.DEBUG,
490                           this.getName() + ", receive ack");
491       } finally {
492         try {
493           socket.getOutputStream().close();
494         } catch (Exception JavaDoc exc) {}
495         try {
496           is.close();
497         } catch (Exception JavaDoc exc) {}
498         try {
499           socket.close();
500         } catch (Exception JavaDoc exc) {}
501       }
502     }
503   }
504
505   final class NetServerIn extends Daemon {
506     ServerSocket listen = null;
507
508     NetServerIn(String JavaDoc name, Logger logmon) throws IOException {
509       super(name + ".NetServerIn");
510       listen = createServerSocket();
511       // Overload logmon definition in Daemon
512
this.logmon = logmon;
513       this.setThreadGroup(AgentServer.getThreadGroup());
514     }
515
516     protected void close() {
517       try {
518     listen.close();
519       } catch (Exception JavaDoc exc) {}
520     }
521
522     protected void shutdown() {
523       close();
524     }
525
526     public void run() {
527       Socket socket = null;
528       OutputStream os = null;
529       ObjectInputStream ois = null;
530       byte[] iobuf = new byte[29];
531
532       try {
533     while (running) {
534       try {
535         canStop = true;
536
537         // Get the connection
538
try {
539               if (this.logmon.isLoggable(BasicLevel.DEBUG))
540                 this.logmon.log(BasicLevel.DEBUG,
541                                 this.getName() + ", waiting connection");
542           socket = listen.accept();
543         } catch (IOException exc) {
544           continue;
545         }
546         canStop = false;
547
548         setSocketOption(socket);
549
550             if (this.logmon.isLoggable(BasicLevel.DEBUG))
551               this.logmon.log(BasicLevel.DEBUG,
552                               this.getName() + ", connected");
553
554         // Read the message,
555
os = socket.getOutputStream();
556             InputStream is = socket.getInputStream();
557
558             Message msg = Message.alloc();
559             int n = 0;
560             do {
561               int count = is.read(iobuf, n, Message.LENGTH +4 - n);
562               if (count < 0) throw new EOFException();
563               n += count;
564             } while (n < (Message.LENGTH +4));
565
566             // Reads boot timestamp of source server
567
int boot = ((iobuf[0] & 0xFF) << 24) +
568               ((iobuf[1] & 0xFF) << 16) +
569               ((iobuf[2] & 0xFF) << 8) +
570               ((iobuf[3] & 0xFF) << 0);
571             
572             int idx = msg.readFromBuf(iobuf, 4);
573
574             // Reads notification attributes
575
boolean persistent = ((iobuf[idx] & Message.PERSISTENT) == 0)?false:true;
576             boolean detachable = ((iobuf[idx] & Message.DETACHABLE) == 0)?false:true;
577
578             // Reads notification object
579
ois = new ObjectInputStream(is);
580             msg.not = (Notification) ois.readObject();
581             if (msg.not.expiration > 0)
582               msg.not.expiration += System.currentTimeMillis();
583             msg.not.persistent = persistent;
584             msg.not.detachable = detachable;
585             msg.not.detached = false;
586
587             if (this.logmon.isLoggable(BasicLevel.DEBUG))
588               this.logmon.log(BasicLevel.DEBUG,
589                               this.getName() + ", msg received");
590
591             testBootTS(msg.getSource(), boot);
592             deliver(msg);
593
594             if (this.logmon.isLoggable(BasicLevel.DEBUG))
595               this.logmon.log(BasicLevel.DEBUG, this.getName() + ", send ack");
596
597         // then send the acknowledge.
598
os.write(0);
599             os.flush();
600       } catch (Exception JavaDoc exc) {
601             this.logmon.log(BasicLevel.ERROR,
602                             this.getName() + ", closed", exc);
603       } finally {
604         try {
605           os.close();
606         } catch (Exception JavaDoc exc) {}
607         os = null;
608         try {
609           ois.close();
610         } catch (Exception JavaDoc exc) {}
611         ois = null;
612         try {
613           socket.close();
614         } catch (Exception JavaDoc exc) {}
615         socket = null;
616       }
617     }
618       } finally {
619         finish();
620       }
621     }
622   }
623
624   /**
625    * Class used to send messages through a TCP stream.
626    */

627   final class MessageOutputStream extends ByteArrayOutputStream {
628     private ObjectOutputStream oos = null;
629     private OutputStream os = null;
630
631     MessageOutputStream() throws IOException {
632       super(256);
633       oos = new ObjectOutputStream(this);
634       count = 0;
635       buf[Message.LENGTH +4] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF);
636       buf[Message.LENGTH +5] = (byte)((ObjectStreamConstants.STREAM_MAGIC >>> 0) & 0xFF);
637       buf[Message.LENGTH +6] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 8) & 0xFF);
638       buf[Message.LENGTH +7] = (byte)((ObjectStreamConstants.STREAM_VERSION >>> 0) & 0xFF);
639     }
640
641     void writeMessage(Socket sock,
642                       Message msg,
643                       long currentTimeMillis) throws IOException {
644       os = sock.getOutputStream();
645
646       // Writes boot timestamp of source server
647
buf[0] = (byte) (getBootTS() >>> 24);
648       buf[1] = (byte) (getBootTS() >>> 16);
649       buf[2] = (byte) (getBootTS() >>> 8);
650       buf[3] = (byte) (getBootTS() >>> 0);
651
652       int idx = msg.writeToBuf(buf, 4);
653       // Writes notification attributes
654
buf[idx++] = (byte) ((msg.not.persistent?Message.PERSISTENT:0) |
655                            (msg.not.detachable?Message.DETACHABLE:0));
656
657       // Be careful, the stream header is hard-written in buf
658
count = Message.LENGTH +8;
659
660       try {
661         if (msg.not.expiration > 0)
662           msg.not.expiration -= currentTimeMillis;
663         oos.writeObject(msg.not);
664         oos.reset();
665         oos.flush();
666         os.write(buf, 0, count);;
667         os.flush();
668       } finally {
669         if (msg.not.expiration > 0)
670           msg.not.expiration += currentTimeMillis;
671         count = 0;
672       }
673     }
674   }
675 }
676
Popular Tags