KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > Network


1 /*
2  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
3  * Copyright (C) 1996 - 2000 BULL
4  * Copyright (C) 1996 - 2000 INRIA
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Dyade
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package fr.dyade.aaa.agent;
25
26 import java.io.*;
27 import java.util.Vector JavaDoc;
28
29 import org.objectweb.util.monolog.api.BasicLevel;
30 import org.objectweb.util.monolog.api.Logger;
31
32 import fr.dyade.aaa.util.Arrays;
33
34 /**
35  * The <code>Network</code> abstract class provides ..
36  */

37 public abstract class Network implements MessageConsumer, NetworkMBean {
38   /**
39    * Period of time in ms between two activations of watch-dog thread,
40    * default value is 1000L (1 second).
41    * This value can be adjusted for all network components by setting
42    * <code>WDActivationPeriod</code> global property or for a particular
43    * network by setting <code>\<DomainName\>.WDActivationPeriod</code>
44    * specific property.
45    * <p>
46    * Theses properties can be fixed either from <code>java</code> launching
47    * command, or in <code>a3servers.xml</code> configuration file.
48    */

49   long WDActivationPeriod = 1000L;
50
51   /**
52    * Gets the WDActivationPeriod value.
53    *
54    * @return the WDActivationPeriod value
55    */

56   public long getWDActivationPeriod() {
57     return WDActivationPeriod;
58   }
59
60   /**
61    * Sets the WDActivationPeriod value.
62    *
63    * @param WDActivationPeriod the WDActivationPeriod value
64    */

65   public void setWDActivationPeriod(long WDActivationPeriod) {
66     this.WDActivationPeriod = WDActivationPeriod;
67   }
68
69   /**
70    * Number of try at stage 1, default value is 30.
71    * This value can be adjusted for all network components by setting
72    * <code>WDNbRetryLevel1</code> global property or for a particular
73    * network by setting <code>\<DomainName\>.WDNbRetryLevel1</code>
74    * specific property.
75    * <p>
76    * Theses properties can be fixed either from <code>java</code> launching
77    * command, or in <code>a3servers.xml</code> configuration file.
78    */

79   int WDNbRetryLevel1 = 30;
80
81   /**
82    * Gets the WDNbRetryLevel1 value.
83    *
84    * @return the WDNbRetryLevel1 value
85    */

86   public int getWDNbRetryLevel1() {
87     return WDNbRetryLevel1;
88   }
89
90   /**
91    * Sets the WDNbRetryLevel1 value.
92    *
93    * @param WDNbRetryLevel1 the WDNbRetryLevel1 value
94    */

95   public void setWDNbRetryLevel1(int WDNbRetryLevel1) {
96     this.WDNbRetryLevel1 = WDNbRetryLevel1;
97   }
98
99   /**
100    * Period of time in ms between two connection try at stage 1, default
101    * value is WDActivationPeriod divided by 2.
102    * This value can be adjusted for all network components by setting
103    * <code>WDRetryPeriod1</code> global property or for a particular
104    * network by setting <code>\<DomainName\>.WDRetryPeriod1</code>
105    * specific property.
106    * <p>
107    * Theses properties can be fixed either from <code>java</code> launching
108    * command, or in <code>a3servers.xml</code> configuration file.
109    */

110   long WDRetryPeriod1 = WDActivationPeriod/2;
111
112   /**
113    * Gets the WDRetryPeriod1 value.
114    *
115    * @return the WDRetryPeriod1 value
116    */

117   public long getWDRetryPeriod1() {
118     return WDRetryPeriod1;
119   }
120
121   /**
122    * Sets the WDRetryPeriod1 value.
123    *
124    * @param WDRetryPeriod1 the WDRetryPeriod1 value
125    */

126   public void setWDRetryPeriod1(long WDRetryPeriod1) {
127     this.WDRetryPeriod1 = WDRetryPeriod1;
128   }
129
130   /**
131    * Number of try at stage 2, default value is 55.
132    * This value can be adjusted for all network components by setting
133    * <code>WDNbRetryLevel2</code> global property or for a particular
134    * network by setting <code>\<DomainName\>.WDNbRetryLevel2</code>
135    * specific property.
136    * <p>
137    * Theses properties can be fixed either from <code>java</code> launching
138    * command, or in <code>a3servers.xml</code> configuration file.
139    */

140   int WDNbRetryLevel2 = 55;
141
142   /**
143    * Gets the WDNbRetryLevel2 value.
144    *
145    * @return the WDNbRetryLevel2 value
146    */

147   public int getWDNbRetryLevel2() {
148     return WDNbRetryLevel2;
149   }
150
151   /**
152    * Sets the WDNbRetryLevel2 value.
153    *
154    * @param WDNbRetryLevel2 the WDNbRetryLevel2 value
155    */

156   public void setWDNbRetryLevel2(int WDNbRetryLevel2) {
157     this.WDNbRetryLevel2 = WDNbRetryLevel2;
158   }
159
160   /**
161    * Period of time in ms between two connection try at stage 2, default
162    * value is 5000L (5 seconds).
163    * This value can be adjusted for all network components by setting
164    * <code>WDRetryPeriod2</code> global property or for a particular
165    * network by setting <code>\<DomainName\>.WDRetryPeriod2</code>
166    * specific property.
167    * <p>
168    * Theses properties can be fixed either from <code>java</code> launching
169    * command, or in <code>a3servers.xml</code> configuration file.
170    */

171   long WDRetryPeriod2 = 5000L;
172
173   /**
174    * Gets the WDRetryPeriod2 value.
175    *
176    * @return the WDRetryPeriod2 value
177    */

178   public long getWDRetryPeriod2() {
179     return WDRetryPeriod2;
180   }
181
182   /**
183    * Sets the WDRetryPeriod2 value.
184    *
185    * @param WDRetryPeriod2 the WDRetryPeriod2 value
186    */

187   public void setWDRetryPeriod2(long WDRetryPeriod2) {
188     this.WDRetryPeriod2 = WDRetryPeriod2;
189   }
190
191   /**
192    * Period of time in ms between two connection try at stage 3, default
193    * value is 60000L (1 minute).
194    * This value can be adjusted for all network components by setting
195    * <code>WDRetryPeriod3</code> global property or for a particular
196    * network by setting <code>\<DomainName\>.WDRetryPeriod3</code>
197    * specific property.
198    * <p>
199    * Theses properties can be fixed either from <code>java</code> launching
200    * command, or in <code>a3servers.xml</code> configuration file.
201    */

202   long WDRetryPeriod3 = 60000L;
203
204   /**
205    * Gets the WDRetryPeriod3 value.
206    *
207    * @return the WDRetryPeriod3 value
208    */

209   public long getWDRetryPeriod3() {
210     return WDRetryPeriod3;
211   }
212
213   /**
214    * Sets the WDRetryPeriod3 value.
215    *
216    * @param WDRetryPeriod3 the WDRetryPeriod3 value
217    */

218   public void setWDRetryPeriod3(long WDRetryPeriod3) {
219     this.WDRetryPeriod3 = WDRetryPeriod3;
220   }
221
222   /**
223    * Gets the number of waiting messages in this engine.
224    *
225    * return the number of waiting messages.
226    */

227   public int getNbWaitingMessages() {
228     return qout.size();
229   }
230
231   protected Logger logmon = null;
232
233   /** Id. of local server. */
234   protected short sid;
235   /** Index of local server in status and matrix arrays. */
236   protected int idxLS;
237   /**
238    * List of id. for all servers in the domain, this list is sorted and
239    * is used as index for internal tables.
240    */

241   protected short[] servers;
242   /** Filename for servers storage */
243   transient protected String JavaDoc serversFN = null;
244   /** Logical timestamp information for messages in domain, stamp[idxLS)]
245    * for messages sent, and stamp[index(id] for messages received.
246    */

247   private int[] stamp;
248   /** Buffer used to optimise transactions*/
249   private byte[] stampbuf = null;
250   /** */
251   private int[] bootTS = null;
252   /** Filename for boot time stamp storage */
253   transient protected String JavaDoc bootTSFN = null;
254  
255   /** The component's name as it appears in logging. */
256   protected String JavaDoc name;
257   /** The domain name. */
258   protected String JavaDoc domain;
259   /** The communication port. */
260   protected int port;
261   /** The <code>MessageVector</code> associated with this network component. */
262   protected MessageVector qout;
263
264   /**
265    * Returns this session's name.
266    *
267    * @return this session's name.
268    */

269   public final String JavaDoc getName() {
270     return name;
271   }
272
273   /**
274    * Returns the corresponding domain's name.
275    *
276    * @return this domain's name.
277    */

278   public final String JavaDoc getDomainName() {
279     return domain;
280   }
281
282   /**
283    * Returns a string representation of this consumer.
284    *
285    * @return A string representation of this consumer.
286    */

287   public String JavaDoc toString() {
288     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
289     strbuf.append("(").append(super.toString());
290     strbuf.append(",name=").append(getName());
291     if (qout != null) strbuf.append(",qout=").append(qout.size());
292     if (servers != null) {
293       for (int i=0; i<servers.length; i++) {
294         strbuf.append(",(").append(servers[i]).append(',');
295         strbuf.append(stamp[i]).append(')');
296       }
297     }
298     strbuf.append(")");
299
300     return strbuf.toString();
301   }
302
303   /**
304    * Creates a new network component. This simple constructor is required in
305    * order to use <code>Class.newInstance()</code> method during configuration.
306    * The configuration of component is then done by <code>init</code> method.
307    */

308   public Network() {
309   }
310
311   /**
312    * Insert a message in the <code>MessageQueue</code>.
313    * This method is used during initialisation to restore the component
314    * state from persistent storage.
315    *
316    * @param msg the message
317    */

318   public void insert(Message msg) {
319     qout.insert(msg);
320   }
321
322   /**
323    * Saves information to persistent storage.
324    */

325   public void save() throws IOException {}
326
327   /**
328    * Restores component's information from persistent storage.
329    * If it is the first load, initializes it.
330    */

331   public void restore() throws Exception JavaDoc {
332     sid = AgentServer.getServerId();
333     idxLS = index(sid);
334     // Loads the logical clock.
335
stampbuf = AgentServer.getTransaction().loadByteArray(name);
336     if (stampbuf == null) {
337       // Creates the new stamp array and the boot time stamp,
338
stampbuf = new byte[4*servers.length];
339       stamp = new int[servers.length];
340       bootTS = new int[servers.length];
341       // Then initializes them
342
for (int i=0; i<servers.length; i++) {
343         if (i != idxLS) {
344           stamp[i] = -1;
345           bootTS[i] = -1;
346         } else {
347           stamp[i] = 0;
348           bootTS[i] = (int) (System.currentTimeMillis() /1000L);
349         }
350       }
351       // Save the servers configuration and the logical time stamp.
352
AgentServer.getTransaction().save(servers, serversFN);
353       AgentServer.getTransaction().save(bootTS, bootTSFN);
354       AgentServer.getTransaction().saveByteArray(stampbuf, name);
355     } else {
356       // Loads the domain configurations
357
short[] s = (short[]) AgentServer.getTransaction().load(serversFN);
358       bootTS = (int[]) AgentServer.getTransaction().load(bootTSFN);
359       stamp = new int[s.length];
360       for (int i=0; i<stamp.length; i++) {
361         stamp[i] = ((stampbuf[(i*4)+0] & 0xFF) << 24) +
362           ((stampbuf[(i*4)+1] & 0xFF) << 16) +
363           ((stampbuf[(i*4)+2] & 0xFF) << 8) +
364           (stampbuf[(i*4)+3] & 0xFF);
365       }
366       // Joins with the new domain configuration:
367
if ((servers != null) && !Arrays.equals(servers, s)) {
368         for (int i=0; i<servers.length; i++)
369           logmon.log(BasicLevel.DEBUG,
370                      "servers[" + i + "]=" + servers[i]);
371         for (int i=0; i<s.length; i++)
372           logmon.log(BasicLevel.DEBUG,
373                      "servers[" + i + "]=" + s[i]);
374
375         throw new IOException("Network configuration changed");
376       }
377     }
378   }
379
380   /**
381    * Initializes a new network component. This method is used in order to
382    * easily creates and configure a Network component from a class name.
383    * So we can use the <code>Class.newInstance()</code> method for create
384    * (whitout any parameter) the component, then we can initialize it with
385    * this method.<br>
386    * This method initializes the logical clock for the domain.
387    *
388    * @param name The domain name.
389    * @param port The listen port.
390    * @param servers The list of servers directly accessible from this
391    * network interface.
392    */

393   public void init(String JavaDoc name, int port, short[] servers) throws Exception JavaDoc {
394     this.name = AgentServer.getName() + '.' + name;
395
396     qout = new MessageVector(this.name,
397                             AgentServer.getTransaction().isPersistent());
398
399     this.domain = name;
400     this.port = port;
401
402     // Get the logging monitor from current server MonologLoggerFactory
403
// Be careful, logmon is initialized from name and not this.name !!
404
logmon = Debug.getLogger(Debug.A3Network + '.' + name);
405     logmon.log(BasicLevel.DEBUG, name + ", initialized");
406
407     WDActivationPeriod = Long.getLong("WDActivationPeriod",
408                                       WDActivationPeriod).longValue();
409     WDActivationPeriod = Long.getLong(name + ".WDActivationPeriod",
410                                       WDActivationPeriod).longValue();
411
412     WDNbRetryLevel1 = Integer.getInteger("WDNbRetryLevel1",
413                                          WDNbRetryLevel1).intValue();
414     WDNbRetryLevel1 = Integer.getInteger(name + ".WDNbRetryLevel1",
415                                          WDNbRetryLevel1).intValue();
416
417     WDRetryPeriod1 = Long.getLong("WDRetryPeriod1",
418                                   WDRetryPeriod1).longValue();
419     WDRetryPeriod1 = Long.getLong(name + ".WDRetryPeriod1",
420                                   WDRetryPeriod1).longValue();
421
422     WDNbRetryLevel2 = Integer.getInteger("WDNbRetryLevel2",
423                                          WDNbRetryLevel2).intValue();
424     WDNbRetryLevel2 = Integer.getInteger(name + ".WDNbRetryLevel2",
425                                          WDNbRetryLevel2).intValue();
426
427     WDRetryPeriod2 = Long.getLong("WDRetryPeriod2",
428                                   WDRetryPeriod2).longValue();
429     WDRetryPeriod2 = Long.getLong(name + ".WDRetryPeriod2",
430                                   WDRetryPeriod2).longValue();
431
432     WDRetryPeriod3 = Long.getLong("WDRetryPeriod3",
433                                   WDRetryPeriod3).longValue();
434     WDRetryPeriod3 = Long.getLong(name + ".WDRetryPeriod3",
435                                   WDRetryPeriod3).longValue();
436
437     // Sorts the array of server ids into ascending numerical order.
438
Arrays.sort(servers);
439
440     this.servers = servers;
441     this.serversFN = name + "Servers";
442     this.bootTSFN = name + "BootTS";
443
444     restore();
445   }
446
447   /**
448    * Adds the server sid in the network configuration.
449    *
450    * @param id the unique server id.
451    */

452   synchronized void addServer(short id) throws Exception JavaDoc {
453     // First we have to verify that id is not already in servers
454
int idx = index(id);
455     if (idx >= 0) return;
456
457     idx = -idx -1;
458     // Allocates new array for stamp and server
459
int[] newStamp = new int[servers.length+1];
460     byte[] newStampBuf = new byte[4*(servers.length+1)];
461     int[] newBootTS = new int[servers.length+1];
462     short[] newServers = new short[servers.length+1];
463     // Copy old data from stamp and server, let a free room for the new one.
464
int j = 0;
465     for (int i=0; i<servers.length; i++) {
466       if (i == idx) j++;
467       newServers[j] = servers[i];
468       newBootTS[j] = bootTS[i];
469       newStamp[j] = stamp[i];
470       j++;
471     }
472     if (idx > 0)
473       System.arraycopy(stampbuf, 0, newStampBuf, 0, idx*4);
474     if (idx < servers.length)
475       System.arraycopy(stampbuf, idx*4,
476                        newStampBuf, (idx+1)*4, (servers.length-idx)*4);
477
478     newServers[idx] = id;
479     newBootTS[idx] = -1;
480     newStamp[idx] = -1; // useless
481
newStampBuf[idx] = 0; // useless
482
newStampBuf[idx+1] = 0; // useless
483
newStampBuf[idx+2] = 0; // useless
484
newStampBuf[idx+3] = 0; // useless
485

486     stamp = newStamp;
487     stampbuf = newStampBuf;
488     servers = newServers;
489     bootTS = newBootTS;
490     // be careful, set again the index of local server.
491
idxLS = index(sid);
492
493     // Save the servers configuration and the logical time stamp.
494
AgentServer.getTransaction().save(servers, serversFN);
495     AgentServer.getTransaction().save(bootTS, bootTSFN);
496     AgentServer.getTransaction().saveByteArray(stampbuf, name);
497   }
498
499   /**
500    * Removes the server sid in the network configuration.
501    *
502    * @param id the unique server id.
503    */

504   synchronized void delServer(short id) throws Exception JavaDoc {
505     // First we have to verify that id is already in servers
506
int idx = index(id);
507     if (idx < 0) return;
508
509     int[] newStamp = new int[servers.length-1];
510     byte[] newStampBuf = new byte[4*(servers.length-1)];
511     int[] newBootTS = new int[servers.length-1];
512     short[] newServers = new short[servers.length-1];
513
514     int j = 0;
515     for (int i=0; i<servers.length; i++) {
516       if (id == servers[i]) {
517         idx = i;
518         continue;
519       }
520       newServers[j] = servers[i];
521       newBootTS[j] = bootTS[i];
522       newStamp[j] = stamp[i];
523       j++;
524     }
525     if (idx > 0)
526       System.arraycopy(stampbuf, 0, newStampBuf, 0, idx*4);
527     if (idx < (servers.length-1))
528       System.arraycopy(stampbuf, (idx+1)*4,
529                        newStampBuf, idx*4, (servers.length-idx-1)*4);
530
531     stamp = newStamp;
532     stampbuf = newStampBuf;
533     servers = newServers;
534     bootTS = newBootTS;
535     // be careful, set again the index of local server.
536
idxLS = index(sid);
537
538     // Save the servers configuration and the logical time stamp.
539
AgentServer.getTransaction().save(servers, serversFN);
540     AgentServer.getTransaction().save(bootTS, bootTSFN);
541     AgentServer.getTransaction().saveByteArray(stampbuf, name);
542   }
543
544   /**
545    * Reset all information related to server sid in the network configuration.
546    *
547    * @param id the unique server id.
548    */

549   synchronized void resetServer(short id, int boot) throws IOException {
550     // First we have to verify that id is already in servers
551
int idx = index(id);
552     if (idx < 0) return;
553
554     // TODO...
555

556     // Save the servers configuration and the logical time stamp.
557
AgentServer.getTransaction().save(servers, serversFN);
558     AgentServer.getTransaction().save(bootTS, bootTSFN);
559     AgentServer.getTransaction().saveByteArray(stampbuf, name);
560   }
561
562   /**
563    * Adds a message in "ready to deliver" list. This method allocates a
564    * new time stamp to the message ; be Careful, changing the stamp imply
565    * the filename change too.
566    */

567   public void post(Message msg) throws Exception JavaDoc {
568     if ((msg.not.expiration > 0) &&
569         (msg.not.expiration < System.currentTimeMillis())) {
570       if (logmon.isLoggable(BasicLevel.DEBUG))
571         logmon.log(BasicLevel.DEBUG,
572                    getName() + ": removes expired notification " +
573                    msg.from + ", " + msg.not);
574       return;
575     }
576
577     short to = AgentServer.getServerDesc(msg.to.to).gateway;
578     // Allocates a new timestamp. Be careful, if the message needs to be
579
// routed we have to use the next destination in timestamp generation.
580

581     msg.source = AgentServer.getServerId();
582     msg.dest = to;
583     msg.stamp = getSendUpdate(to);
584
585     // Saves the message.
586
msg.save();
587     // Push it in "ready to deliver" queue.
588
qout.push(msg);
589   }
590
591   /**
592    * Returns the index in internal table of the specified server.
593    * The servers array must be ordered.
594    *
595    * @param id the unique server id.
596    */

597   protected final int index(short id) {
598     int idx = Arrays.binarySearch(servers, id);
599     return idx;
600   }
601
602   protected final byte[] getStamp() {
603     return stampbuf;
604   }
605
606   protected final void setStamp(byte[] stampbuf) {
607     this.stampbuf = stampbuf;
608     stamp = new int[servers.length];
609     for (int i=0; i<stamp.length; i++) {
610       stamp[i] = ((stampbuf[(i*4)+0] & 0xFF) << 24) +
611         ((stampbuf[(i*4)+1] & 0xFF) << 16) +
612         ((stampbuf[(i*4)+2] & 0xFF) << 8) +
613         (stampbuf[(i*4)+3] & 0xFF);
614     }
615   }
616
617   private void updateStamp(int idx, int update) throws IOException {
618     stamp[idx] = update;
619     stampbuf[(idx*4)+0] = (byte)((update >>> 24) & 0xFF);
620     stampbuf[(idx*4)+1] = (byte)((update >>> 16) & 0xFF);
621     stampbuf[(idx*4)+2] = (byte)((update >>> 8) & 0xFF);
622     stampbuf[(idx*4)+3] = (byte)(update & 0xFF);
623     AgentServer.getTransaction().saveByteArray(stampbuf, name);
624   }
625
626   /** The message can be delivered. */
627   static final int DELIVER = 0;
628 // /**
629
// * There is other message in the causal ordering before this one.
630
// * This cannot happened with a FIFO ordering.
631
// */
632
// static final int WAIT_TO_DELIVER = 1;
633
/** The message has already been delivered. */
634   static final int ALREADY_DELIVERED = 2;
635
636   /**
637    * Test if a received message with the specified clock must be
638    * delivered. If the message is ready to be delivered, the method returns
639    * <code>DELIVER</code> and the matrix clock is updated. If the message has
640    * already been delivered, the method returns <code>ALREADY_DELIVERED</code>,
641    * and if other messages are waited before this message the method returns
642    * <code>WAIT_TO_DELIVER</code>. In the last two case the matrix clock
643    * remains unchanged.
644    *
645    * @param update The message matrix clock (list of update).
646    * @return <code>DELIVER</code>, <code>ALREADY_DELIVERED</code>,
647    * or <code>WAIT_TO_DELIVER</code> code.
648    */

649   private synchronized int testRecvUpdate(short source, int update) throws IOException {
650     int fromIdx = index(source);
651
652     if (update > stamp[fromIdx]) {
653       updateStamp(fromIdx, update);
654       return DELIVER;
655     }
656     return ALREADY_DELIVERED;
657   }
658
659   /**
660    * Computes the matrix clock of a send message. The server's
661    * matrix clock is updated.
662    *
663    * @param to The identification of receiver.
664    * @return The message matrix clock (list of update).
665    */

666   private synchronized int getSendUpdate(short to) throws IOException {
667     int update = stamp[idxLS] +1;
668     updateStamp(idxLS, update);
669     return update;
670   }
671
672   final int getBootTS() {
673     return bootTS[idxLS];
674   }
675
676   final void testBootTS(short source, int boot) throws IOException {
677     int fromIdx = index(source);
678
679     if (boot != bootTS[fromIdx]) {
680       if (logmon.isLoggable(BasicLevel.WARN))
681         logmon.log(BasicLevel.WARN,
682                    getName() + ", reset stamp #" + source + ", "
683                    + bootTS[fromIdx] + " -> " + boot);
684
685       bootTS[fromIdx] = boot;
686       AgentServer.getTransaction().save(bootTS, bootTSFN);
687       updateStamp(fromIdx, -1);
688     }
689   }
690
691 // int last = -1;
692

693   /**
694    * Try to deliver the received message to the right consumer.
695    *
696    * @param msg the message.
697    */

698   protected void deliver(Message msg) throws Exception JavaDoc {
699     // Get real from serverId.
700
short source = msg.getSource();
701
702     // Test if the message is really for this node (final destination or
703
// router).
704
short dest = msg.getDest();
705     if (dest != AgentServer.getServerId()) {
706       logmon.log(BasicLevel.ERROR,
707                  getName() + ", recv bad msg#" + msg.getStamp() +
708                  " really to " + dest +
709                  " by " + source);
710       throw new Exception JavaDoc("recv bad msg#" + msg.getStamp() +
711                           " really to " + dest +
712                           " by " + source);
713     }
714
715 // if ((last != -1) && (msg.getStamp() != (last +1)))
716
// logmon.log(BasicLevel.FATAL,
717
// getName() + ", recv msg#" + msg.getStamp() + " should be #" + (last +1));
718
// last = msg.getStamp();
719

720     if (logmon.isLoggable(BasicLevel.DEBUG))
721       logmon.log(BasicLevel.DEBUG,
722                  getName() + ", recv msg#" + msg.getStamp() +
723                  " from " + msg.from +
724                  " to " + msg.to +
725                  " by " + source);
726
727     ServerDesc desc = AgentServer.getServerDesc(source);
728     if (! desc.active) {
729       desc.active = true;
730       desc.retry = 0;
731     }
732
733     // Start a transaction in order to ensure atomicity of clock updates
734
// and queue changes.
735
AgentServer.getTransaction().begin();
736
737     // Test if the message can be delivered then deliver it
738
// else put it in the waiting list
739
int todo = testRecvUpdate(source, msg.getStamp());
740
741     if (todo == DELIVER) {
742       // Deliver the message then try to deliver alls waiting message.
743
// Allocate a local time to the message to order it in
744
// local queue, and save it.
745
Channel.post(msg);
746
747       if (logmon.isLoggable(BasicLevel.DEBUG))
748         logmon.log(BasicLevel.DEBUG,
749                    getName() + ", deliver msg#" + msg.getStamp());
750
751       Channel.save();
752       AgentServer.getTransaction().commit();
753       // then commit and validate the message.
754
Channel.validate();
755       AgentServer.getTransaction().release();
756     } else {
757 // it's an already delivered message, we have just to re-send an
758
// aknowledge (see below).
759
AgentServer.getTransaction().commit();
760       AgentServer.getTransaction().release();
761     }
762   }
763
764   /**
765    * Deletes the component, removes all persistent datas. The component
766    * may have been previously stopped, and removed from MessageConsumer
767    * list.
768    * This operation use Transaction calls, you may use commit to validate it.
769    *
770    * @see fr.dyade.aaa.util.Transaction
771    */

772   public void delete() throws IllegalStateException JavaDoc {
773     if (isRunning()) throw new IllegalStateException JavaDoc();
774
775     AgentServer.getTransaction().delete(serversFN);
776     AgentServer.getTransaction().delete(bootTSFN);
777     AgentServer.getTransaction().delete(name);
778   }
779
780   /**
781    * Validates all messages pushed in queue during transaction session.
782    */

783   public void validate() {
784     qout.validate();
785   }
786
787   public MessageQueue getQueue() {
788     return qout;
789   }
790
791   /**
792    * Updates the network port.
793    */

794   public void setPort(int port) {
795     this.port = port;
796   }
797
798   public final int getPort() {
799     return port;
800   }
801 }
802
Popular Tags