KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > fr > dyade > aaa > agent > MessageVector


1 /*
2  * Copyright (C) 2004 - 2005 ScalAgent Distributed Technologies
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
17  * USA.
18  *
19  * Initial developer(s): ScalAgent Distributed Technologies
20  * Contributor(s):
21  */

22 package fr.dyade.aaa.agent;
23
24 import java.io.*;
25
26 import org.objectweb.util.monolog.api.BasicLevel;
27 import org.objectweb.util.monolog.api.Logger;
28
29 import fr.dyade.aaa.util.EmptyQueueException;
30
31 /**
32  * Class <code>MessageVector</code> represents a persistent vector of
33  * <tt>Message</tt> (source and target agent identifier, notification).
34  * As messages have a relatively short life span, then the messages
35  * are kept in main memory. If possible, the list is backed by a persistent
36  * image on the disk for reliability needs. In this case, we can use
37  * <tt>SoftReference</tt> to avoid memory overflow.<p><hr>
38  * The stamp information in Message is used to restore the queue from
39  * persistent storage at initialization time, so there is no longer need
40  * to save <code>MessageVector</code> object state.
41  */

42 final class MessageVector implements MessageQueue {
43   private Logger logmon = null;
44   private String JavaDoc logmsg = null;
45   private long cpt1, cpt2;
46
47   /**
48    * The array buffer into which the <code>Message</code> objects are stored
49    * in memory. The capacity of this array buffer is at least large enough to
50    * contain all the messages of the <code>MessageVector</code>.<p>
51    * Messages are stored in a circular way, first one in <tt>data[first]</tt>
52    * through <tt>data[(first+count-1)%length]</tt>. Any other array elements
53    * are null.
54    */

55   private Object JavaDoc data[];
56   /** The index of the first message in the circular buffer. */
57   private int first;
58   /**
59    * The number of messages in this <tt>MessageVector</tt> object. Components
60    * <tt>data[first]</tt> through <tt>data[(first+count)%length]</tt> are the
61    * actual items.
62    */

63   private int count;
64   /** The number of validated message in this <tt>MessageQueue</tt>. */
65   private int validated;
66
67   private boolean persistent;
68
69   MessageVector(String JavaDoc name, boolean persistent) {
70     logmon = Debug.getLogger(getClass().getName() + '.' + name);
71     logmsg = name + ".MessageVector: ";
72
73     this.persistent = persistent;
74     data = new Object JavaDoc[50];
75     first = 0;
76     count = 0;
77     validated = 0;
78   }
79
80   /**
81    * Insert a message in the queue, it should only be used during
82    * initialization for restoring the queue state.
83    *
84    * @param item the message to be pushed onto this queue.
85    */

86   public synchronized void insert(Message item) {
87     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
88       logmon.log(BasicLevel.DEBUG, logmsg + "insert(" + item + ")");
89
90     int i = 0;
91     for (; i<validated; i++) {
92       Message msg = getMessageAt(i);
93       if (item.getStamp() < msg.getStamp()) break;
94     }
95     insertMessageAt(item, i);
96     validated += 1;
97   }
98
99   /**
100    * Pushes a message onto the bottom of this queue. It should only
101    * be used during a transaction. The item will be really available
102    * after the transaction commit and the queue validate.
103    *
104    * @param item the message to be pushed onto this queue.
105    */

106   public synchronized void push(Message item) {
107     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
108       logmon.log(BasicLevel.DEBUG, logmsg + "push(" + item + ")");
109     addMessage(item);
110   }
111
112   /**
113    * Removes the message at the top of this queue.
114    * It must only be used during a transaction.
115    *
116    * @return The message at the top of this queue.
117    * @exception EmptyQueueException if this queue is empty.
118    */

119   public synchronized Message pop() throws EmptyQueueException {
120     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
121       logmon.log(BasicLevel.DEBUG, logmsg + "pop()");
122
123     if (validated == 0)
124       throw new EmptyQueueException();
125     
126     Message item = getMessageAt(0);
127     removeMessageAt(0);
128     validated -= 1;
129
130     return item;
131   }
132
133   /**
134    * Atomicaly validates all messages pushed in queue during a reaction.
135    * It must only be used during a transaction.
136    */

137   public synchronized void validate() {
138     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
139       logmon.log(BasicLevel.DEBUG, logmsg + "validate()");
140     validated = size();
141     notify();
142   }
143
144   /**
145    * Looks at the message at the top of this queue without removing
146    * it from the queue.
147    * It should never be used during a transaction to avoid dead-lock
148    * problems.
149    *
150    * @return the message at the top of this queue.
151    * @exception InterruptedException if another thread has interrupted the
152    * current thread.
153    */

154   public synchronized Message get() throws InterruptedException JavaDoc {
155     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) {
156       logmon.log(BasicLevel.DEBUG, logmsg + "get()");
157
158       cpt1 += 1; cpt2 += validated;
159       if ((cpt1 & 0xFFFFL) == 0L) {
160           logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated);
161       }
162     }
163     
164     while (validated == 0) {
165       wait();
166     }
167     Message item = getMessageAt(0);
168  
169     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
170       logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item);
171
172    return item;
173   }
174
175   /**
176    * Looks at the message at the top of this queue without removing
177    * it from the queue. It should never be used during a transaction
178    * to avoid dead-lock problems. It waits until a message is available
179    * or the specified amount of time has elapsed.
180    *
181    * @param timeout the maximum time to wait in milliseconds.
182    * @return the message at the top of this queue.
183    * @exception InterruptedException if another thread has interrupted the
184    * current thread.
185    * @exception IllegalArgumentException if the value of timeout is negative.
186    */

187   public synchronized Message get(long timeout) throws InterruptedException JavaDoc {
188     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) {
189       logmon.log(BasicLevel.DEBUG, logmsg + "get(" + timeout + ")");
190
191       cpt1 += 1; cpt2 += validated;
192       if ((cpt1 & 0xFFFFL) == 0L) {
193         logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated);
194       }
195     }
196     
197     Message item = null;
198     if ((validated == 0) && (timeout > 0)) wait(timeout);
199     if (validated > 0) item = getMessageAt(0);
200
201     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
202       logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item);
203
204     return item;
205   }
206
207   /**
208    * Looks at the first message of this queue where the destination server
209    * is the specified one.
210    * The message is not removed from the queue. It should never be used during
211    * a transaction to avoid dead-lock problems.
212    *
213    * @param to the unique server id.
214    * @return the corresponding message or null if none .
215    */

216   public synchronized Message getMessageTo(short to) {
217     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) {
218       logmon.log(BasicLevel.DEBUG, logmsg + "getFrom(" + to + ")");
219
220       cpt1 += 1; cpt2 += validated;
221       if ((cpt1 & 0xFFFFL) == 0L) {
222         logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated);
223       }
224     }
225     
226     Message item = null;
227     for (int i=0; i<validated; i++) {
228       Message msg = getMessageAt(i);
229       if (msg.getDest() == to) {
230         item = msg;
231         break;
232       }
233     }
234
235     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
236       logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item);
237
238     return item;
239   }
240
241   /**
242    * Removes the specified message from the queue if exists.
243    *
244    * @param msg the message to remove.
245    */

246   synchronized void removeMessage(Message msg) {
247     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
248       logmon.log(BasicLevel.DEBUG,
249                  logmsg + "removeMessage #" + msg.getStamp());
250
251     for (int i = 0; i<validated; i++) {
252       if (getMessageAt(i) == msg) {
253
254         if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
255           logmon.log(BasicLevel.DEBUG,
256                      logmsg + "removeMessage #" + msg.getStamp() + " -> " + i);
257
258         removeMessageAt(i);
259         validated -= 1;
260         return;
261       }
262     }
263
264     logmon.log(BasicLevel.ERROR,
265                logmsg + "removeMessage #" + msg.getStamp() + " not found");
266
267     return;
268   }
269
270   /**
271    * Removes all messages with a stamp less than the specified one.
272    * Be careful with the use of this method, in particular it does not
273    * take in account the multiples incoming nodes.
274    */

275   synchronized int remove(int stamp) {
276     if (validated == 0) return 0;
277     
278     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
279       logmon.log(BasicLevel.DEBUG, logmsg + "remove #" + stamp);
280
281     int i = 0;
282     for (; i<validated; i++) {
283       Message msg = getMessageAt(i);
284       if (stamp < msg.getStamp()) break;
285     }
286
287     for (int j=0; j<i; j++) {
288       removeMessageAt(0);
289     }
290     validated -= i;
291     
292     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
293       logmon.log(BasicLevel.DEBUG, logmsg + "remove #" + stamp + " ->" +i);
294
295     return i;
296   }
297
298   /**
299    * Inserts the specified message to this <code>MessageVector</code> at
300    * the specified index. Each component in this vector with an index greater
301    * or equal to the specified index is shifted upward.
302    *
303    * @param item the message to be pushed onto this queue.
304    * @param index where to insert the new message.
305    */

306   void insertMessageAt(Message item, int index) {
307     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
308       logmon.log(BasicLevel.DEBUG,
309                  logmsg + "insertMessageAt(" + item + ", " + index + ")");
310
311     if (count == data.length) {
312       Object JavaDoc newData[] = new Object JavaDoc[data.length *2];
313       if ((first + count) < data.length) {
314         System.arraycopy(data, first, newData, 0, count);
315       } else {
316         int j = data.length - first;
317         System.arraycopy(data, first, newData, 0, j);
318         System.arraycopy(data, 0, newData, j, count - j);
319       }
320       first = 0;
321       data = newData;
322     }
323     if (index != count)
324       System.arraycopy(data, index, data, index + 1, count - index);
325     if (persistent)
326       data[(first + index)%data.length] = new MessageSoftRef(item);
327     else
328       data[(first + index)%data.length] = item;
329     count += 1;
330
331     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
332       logmon.log(BasicLevel.DEBUG,
333                  logmsg + "insertMessageAt() -> " + this);
334   }
335
336   /**
337    * Adds the specified message to the end of internal <code>Vector</code>.
338    *
339    * @param item the message to be added onto this queue.
340    */

341   void addMessage(Message item) {
342     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
343       logmon.log(BasicLevel.DEBUG,
344                  logmsg + "addMessage(" + item + ")");
345
346     insertMessageAt(item, count);
347   }
348
349   /**
350    * Returns the message at the specified index.
351    *
352    * @param index the index of the message.
353    * @return The message at the top of this queue.
354    */

355   Message getMessageAt(int index) {
356     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
357       logmon.log(BasicLevel.DEBUG, logmsg + "getMessageAt(" + index + ")");
358
359     int idx = (first + index)%data.length;
360     if (persistent) {
361       Message msg = ((MessageSoftRef) data[idx]).getMessage();
362       if (msg == null) {
363         msg = ((MessageSoftRef) data[idx]).loadMessage();
364         data[idx] = new MessageSoftRef(msg);
365       }
366       return msg;
367     } else {
368       return (Message) data[idx];
369     }
370   }
371
372   /**
373    * Deletes the message at the specified index.
374    *
375    * @param index the index of the message to remove.
376    */

377   void removeMessageAt(int index) {
378     if ((first + index) < data.length) {
379       // Moves the start of the vector +1 to the empty 'box'
380
System.arraycopy(data, first,
381                        data, first +1, index);
382       // Erase the old first 'box'
383
data[first] = null; /* to let gc do its work */
384       // Move the first ptr +1, and decrease counter
385
first = (first +1)%data.length;
386       count -= 1;
387     } else {
388       // Moves the end of the vector -1 to the empty 'box'
389
System.arraycopy(data, (first + index)%data.length +1,
390                        data, (first + index)%data.length, count - index -1);
391       // Erase the old last 'box'
392
data[(first + count -1)%data.length] = null; /* to let gc do its work */
393       // Decrease counter
394
count -= 1;
395     }
396     if (count == 0) first = 0;
397
398     if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
399       logmon.log(BasicLevel.DEBUG,
400                  logmsg + "removeMessageAt(" + index + ") -> " + this);
401   }
402
403   /**
404    * Returns the number of messages in this vector.
405    *
406    * @return the number of messages in this vector.
407    */

408   public int size() {
409     return count;
410   }
411
412   /**
413    * Returns a string representation of this <code>MessageVector</code>
414    * object. Be careful we scan the vector without synchronization, so the
415    * result can be incoherent.
416    *
417    * @return A string representation of this object.
418    */

419   public String JavaDoc toString() {
420     StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
421     
422     strbuf.append('(').append(super.toString());
423     strbuf.append(",first=").append(first);
424     strbuf.append(",count=").append(count);
425     strbuf.append(",validated=").append(validated).append(",(");
426     for (int i=0; i<data.length; i++) {
427       strbuf.append(data[i]).append(',');
428     }
429     strbuf.append("))");
430     
431     return strbuf.toString();
432   }
433   
434   final class MessageSoftRef extends java.lang.ref.SoftReference JavaDoc {
435     /**
436      * Name for persistent message, used to retrieve garbaged message
437      * from persistent storage.
438      */

439     String JavaDoc name = null;
440     /**
441      * Reference for transient message, used to pin non persistent
442      * in memory.
443      */

444     Message ref = null;
445     
446     MessageSoftRef(Message msg) {
447       super(msg);
448       if (msg.isPersistent())
449         name = msg.toStringId();
450       else
451         ref = msg;
452     }
453
454     /**
455      * Returns this reference message's referent. If the message has been
456      * swap out it returns null.
457      *
458      * @return The message to which this reference refers.
459      */

460     public Message getMessage() {
461       if (ref != null) return ref;
462       return (Message) get();
463     }
464
465     /**
466      * Loads from disk this reference message's referent if the message
467      * has been swap out. It should be called only after a getMessage
468      * returning null.
469      *
470      * @return The message to which this reference refers.
471      */

472     public Message loadMessage() throws TransactionError {
473       if (ref != null) return ref;
474
475       Message msg;
476       try {
477         msg = Message.load(name);
478
479         if (logmon.isLoggable(BasicLevel.DEBUG))
480           logmon.log(BasicLevel.DEBUG, logmsg + "reload from disk " + msg);
481       } catch (Exception JavaDoc exc) {
482         logmon.log(BasicLevel.ERROR,
483                    logmsg + "Can't load message " + name, exc);
484         throw new TransactionError(exc);
485       }
486       return msg;
487     }
488
489     /**
490      * Returns a string representation of this <code>MessageSoftRef</code>
491      * object.
492      *
493      * @return A string representation of this object.
494      */

495     public String JavaDoc toString() {
496       StringBuffer JavaDoc strbuf = new StringBuffer JavaDoc();
497       
498       strbuf.append('(').append(super.toString());
499       strbuf.append(",name=").append(name);
500       strbuf.append(",ref=").append(ref);
501       strbuf.append("))");
502       
503       return strbuf.toString();
504     }
505   }
506
507   final class TransactionError extends Error JavaDoc {
508     TransactionError(Throwable JavaDoc cause) {
509       super(cause);
510     }
511   }
512 }
513
Popular Tags