KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > dream > queue > BufferAscendingSequenceNumberImpl


1 /**
2  * Dream
3  * Copyright (C) 2003-2004 INRIA Rhone-Alpes
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Contact: dream@objectweb.org
20  *
21  * Initial developer(s): Vivien Quema
22  * Contributor(s):
23  */

24
25 package org.objectweb.dream.queue;
26
27 import org.objectweb.dream.message.Message;
28 import org.objectweb.dream.message.manager.MessageManager;
29 import org.objectweb.dream.pool.ObjectPool;
30 import org.objectweb.dream.pool.Recyclable;
31 import org.objectweb.fractal.api.NoSuchInterfaceException;
32 import org.objectweb.fractal.api.control.IllegalBindingException;
33 import org.objectweb.fractal.api.control.IllegalLifeCycleException;
34
35 /**
36  * Implementation of the <code>Buffer</code> interface using a sorted list.
37  * This buffer sorts messages according to a sequence number. For that purpose
38  * all messages must contain a {@link SequenceNumberChunk}chunk. The name under
39  * which this chunk is registered is specified using the
40  * {@link QueueSortedAttributeController}.
41  * <p>
42  * This buffer guarantees that if message <i>m1 </i> (with sequence number <i>sn
43  * </i>) is removed from the buffer, then next message to be removed will have
44  * sequence number <i>sn+1 </i>. As a consequence, the number of available
45  * message may be different from the number of stored messages.
46  * <p>
47  * <p>
48  * <i>Note: </i>This buffer DOES NOT ALLOW overflow policies that drop messages.
49  * <i>Indicators </i> on available space, available messages, and stored
50  * messages are expressed as number of messages.
51  *
52  * @see Buffer
53  * @see AbstractBufferImpl @
54  */

55 public class BufferAscendingSequenceNumberImpl extends AbstractBufferImpl
56     implements
57       QueueSortedAttributeController
58 {
59   /**
60    * The name under which the chunk containing the sequence number is
61    * registered.
62    */

63   protected String JavaDoc sortingChunkName;
64
65   /** The first element stored in this buffer. */
66   protected Element first = null;
67
68   /** The last element stored in this buffer. */
69   protected Element last = null;
70
71   /** The client interface used to access the pool of {@link Element}instances. */
72   protected ObjectPool objectPoolItf;
73
74   /** The last sequence number in sequence. */
75   protected long lastInSequence = -1;
76
77   // ---------------------------------------------------------------------------
78
// Implemented methods.
79
// ---------------------------------------------------------------------------
80

81   /**
82    * @see org.objectweb.dream.queue.AbstractBufferImpl#hasAvailableMessage()
83    */

84   protected boolean hasAvailableMessage()
85   {
86     return (availableMessagesIndicator > 0);
87   }
88
89   /**
90    * @see org.objectweb.dream.queue.AbstractBufferImpl#canAdd(org.objectweb.dream.message.Message)
91    */

92   protected boolean canAdd(Message message)
93   {
94     if (maxCapacity > 0 && (maxCapacity - storedMessagesIndicator <= 0))
95     {
96       return false;
97     }
98     return true;
99   }
100
101   // ---------------------------------------------------------------------------
102
// Implemented methods for the Buffer interface.
103
// ---------------------------------------------------------------------------
104

105   /**
106    * @see org.objectweb.dream.queue.AbstractBufferImpl#doAdd(org.objectweb.dream.message.Message)
107    */

108   protected void doAdd(Message message)
109   {
110     long sequenceNumber = ((SequenceNumberChunk) message
111         .getChunk(sortingChunkName)).getSequenceNumber();
112     if (sequenceNumber <= lastInSequence)
113     {
114       // The message has already been stored or delivered.
115
messageManagerItf.deleteMessage(message);
116       return;
117     }
118     Element current = last;
119     while (current != null && sequenceNumber < current.sequenceNumber)
120     {
121       current = current.previous;
122     }
123
124     if (current != null && sequenceNumber == current.sequenceNumber)
125     {
126       // The message is already stored in the buffer.
127
messageManagerItf.deleteMessage(message);
128       return;
129     }
130     Element element = (Element) objectPoolItf.newInstance();
131     element.msg = message;
132     element.sequenceNumber = sequenceNumber;
133
134     if (current == null)
135     {
136       // insert at the beginning
137
if (first != null)
138       {
139         // list not empty
140
element.next = first;
141         first.previous = element;
142       }
143       else
144       {
145         // list is empty, first = last
146
last = element;
147       }
148       first = element;
149     }
150     else
151     {
152       // insert after current
153
element.previous = current;
154       if (current.next != null)
155       {
156         // new element is not the new last
157
element.next = current.next;
158         element.next.previous = element;
159       }
160       else
161       {
162         // insert at the end
163
last = element;
164       }
165       element.previous.next = element;
166
167     }
168
169     // update last in sequence
170
// logger.log(BasicLevel.INFO, "inserted element " +
171
// element.sequenceNumber);
172
current = element;
173     int counter = 0;
174     while (current != null && current.sequenceNumber == lastInSequence + 1)
175     {
176       counter++;
177       lastInSequence++;
178       current = current.next;
179     }
180     if (counter > 0)
181     {
182       incrementAvailableMessagesIndicator(counter);
183     }
184     incrementStoredMessagesIndicator(1);
185   }
186
187   /**
188    * @see org.objectweb.dream.queue.AbstractBufferImpl#doRemove()
189    */

190   protected Message doRemove()
191   {
192     incrementAvailableMessagesIndicator(-1);
193     incrementStoredMessagesIndicator(-1);
194     Element newFirst = first.next;
195     if (newFirst != null)
196     {
197       newFirst.previous = null;
198     }
199     else
200     {
201       last = null;
202     }
203     Message msg = first.msg;
204     objectPoolItf.recycleInstance(first);
205     first = newFirst;
206     return msg;
207   }
208
209   /**
210    * @see org.objectweb.dream.queue.AbstractBufferImpl#doGet()
211    */

212   protected Message doGet()
213   {
214     return first.msg;
215   }
216
217   // ---------------------------------------------------------------------------
218
// Implementation of the QueueSortedAttributeController interface.
219
// ---------------------------------------------------------------------------
220

221   /**
222    * @see org.objectweb.dream.queue.QueueSortedAttributeController#getSortingChunkName()
223    */

224   public String JavaDoc getSortingChunkName()
225   {
226     return sortingChunkName;
227   }
228
229   /**
230    * @see QueueSortedAttributeController#setSortingChunkName(String)
231    */

232   public void setSortingChunkName(String JavaDoc name)
233   {
234     this.sortingChunkName = name;
235   }
236
237   // ---------------------------------------------------------------------------
238
// Inner class.
239
// ---------------------------------------------------------------------------
240

241   /**
242    * This class represents elements of a linked list. Each element contains a
243    * reference to its predecessor (<code>previous</code>), its successor (
244    * <code>next</code>), a message, and a sequence number.
245    */

246   public static class Element implements Recyclable
247   {
248     Element previous;
249     Element next;
250     Message msg;
251     long sequenceNumber;
252
253     // ---------------------------------------------------------------------------
254
// Implementation of the Recyclable interface.
255
// ---------------------------------------------------------------------------
256

257     /**
258      * @see Recyclable#recycle()
259      */

260     public void recycle()
261     {
262       previous = null;
263       next = null;
264       msg = null;
265       sequenceNumber = -1;
266     }
267   }
268
269   // ---------------------------------------------------------------------------
270
// Implementation of the BindingController interface.
271
// ---------------------------------------------------------------------------
272

273   /**
274    * @see org.objectweb.fractal.api.control.BindingController#bindFc(String,
275    * Object)
276    */

277   public synchronized void bindFc(String JavaDoc clientItfName, Object JavaDoc serverItf)
278       throws NoSuchInterfaceException, IllegalBindingException,
279       IllegalLifeCycleException
280   {
281     super.bindFc(clientItfName, serverItf);
282     if (clientItfName.equals(ObjectPool.ITF_NAME))
283     {
284       objectPoolItf = (ObjectPool) serverItf;
285     }
286   }
287
288   /**
289    * @see org.objectweb.fractal.api.control.BindingController#listFc()
290    */

291   public String JavaDoc[] listFc()
292   {
293     return new String JavaDoc[]{MessageManager.ITF_NAME, ObjectPool.ITF_NAME};
294   }
295 }
Popular Tags