KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > DistributedQueue


1 // $Id: DistributedQueue.java,v 1.15 2005/02/25 07:06:07 belaban Exp $
2
package org.jgroups.blocks;
3
4 import org.apache.commons.logging.Log;
5 import org.apache.commons.logging.LogFactory;
6 import org.jgroups.*;
7 import org.jgroups.util.RspList;
8 import org.jgroups.util.Util;
9
10 import java.io.Serializable JavaDoc;
11 import java.util.*;
12
13
14 /**
15  * Provides the abstraction of a java.util.LinkedList that is replicated at several
16  * locations. Any change to the list (reset, add, remove etc) will transparently be
17  * propagated to all replicas in the group. All read-only methods will always access the
18  * local replica.<p>
19  * Both keys and values added to the list <em>must be serializable</em>, the reason
20  * being that they will be sent across the network to all replicas of the group.
21  * An instance of this class will contact an existing member of the group to fetch its
22  * initial state.
23  * Beware to use a <em>total protocol</em> on initialization or elements would not be in same
24  * order on all replicas.
25  * @author Romuald du Song
26  */

27 public class DistributedQueue implements MessageListener, MembershipListener, Cloneable JavaDoc
28 {
29     public interface Notification
30     {
31         void entryAdd(Object JavaDoc value);
32
33         void entryRemoved(Object JavaDoc key);
34
35         void viewChange(Vector new_mbrs, Vector old_mbrs);
36
37         void contentsCleared();
38
39         void contentsSet(Collection new_entries);
40     }
41
42     protected Log logger = LogFactory.getLog(getClass());
43     private long internal_timeout = 10000; // 10 seconds to wait for a response
44

45     /*lock object for synchronization*/
46     protected Object JavaDoc mutex = new Object JavaDoc();
47     protected transient boolean stopped = false; // whether to we are stopped !
48
protected LinkedList internalQueue;
49     protected transient Channel channel;
50     protected transient RpcDispatcher disp = null;
51     protected transient String JavaDoc groupname = null;
52     protected transient Vector notifs = new Vector(); // to be notified when mbrship changes
53
protected transient Vector members = new Vector(); // keeps track of all DHTs
54
private transient Class JavaDoc[] add_signature = null;
55     private transient Class JavaDoc[] addAtHead_signature = null;
56     private transient Class JavaDoc[] addAll_signature = null;
57     private transient Class JavaDoc[] reset_signature = null;
58     private transient Class JavaDoc[] remove_signature = null;
59     
60     /**
61      * Creates a DistributedQueue
62      * @param groupname The name of the group to join
63      * @param factory The ChannelFactory which will be used to create a channel
64      * @param properties The property string to be used to define the channel
65      * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.
66      */

67     public DistributedQueue(String JavaDoc groupname, ChannelFactory factory, String JavaDoc properties, long state_timeout)
68                      throws ChannelException
69     {
70         if (logger.isDebugEnabled())
71         {
72             logger.debug("DistributedQueue(" + groupname + ',' + properties + ',' + state_timeout);
73         }
74
75         this.groupname = groupname;
76         initSignatures();
77         internalQueue = new LinkedList();
78         channel = (factory != null) ? factory.createChannel(properties) : new JChannel(properties);
79         disp = new RpcDispatcher(channel, this, this, this);
80         disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
81
channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
82         channel.connect(groupname);
83         start(state_timeout);
84     }
85
86     public DistributedQueue(JChannel channel)
87     {
88         this.groupname = channel.getChannelName();
89         this.channel = channel;
90         init();
91     }
92
93     /**
94       * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
95       * used to register under that id. This is typically used when another building block is already using
96       * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
97       * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
98       * first block created on PullPushAdapter.
99       * The caller needs to call start(), before using the this block. It gives the opportunity for the caller
100       * to register as a lessoner for Notifications events.
101       * @param adapter The PullPushAdapter which to use as underlying transport
102       * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
103       * requests/responses for different building blocks on top of PullPushAdapter.
104       */

105     public DistributedQueue(PullPushAdapter adapter, Serializable JavaDoc id)
106     {
107         this.channel = (Channel)adapter.getTransport();
108         this.groupname = this.channel.getChannelName();
109
110         initSignatures();
111         internalQueue = new LinkedList();
112
113         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
114         disp = new RpcDispatcher(adapter, id, this, this, this);
115         disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
116
}
117
118     protected void init()
119     {
120         initSignatures();
121         internalQueue = new LinkedList();
122         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
123         disp = new RpcDispatcher(channel, this, this, this);
124         disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
125
}
126
127     public void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException
128     {
129         boolean rc;
130         logger.debug("DistributedQueue.initState(" + groupname + "): starting state retrieval");
131
132         rc = channel.getState(null, state_timeout);
133
134         if (rc)
135         {
136             logger.info("DistributedQueue.initState(" + groupname + "): state was retrieved successfully");
137         }
138         else
139         {
140             logger.info("DistributedQueue.initState(" + groupname + "): state could not be retrieved (first member)");
141         }
142     }
143
144     public Address getLocalAddress()
145     {
146         return (channel != null) ? channel.getLocalAddress() : null;
147     }
148
149     public Channel getChannel()
150     {
151         return channel;
152     }
153
154     public void addNotifier(Notification n)
155     {
156         if (n != null && !notifs.contains(n))
157         {
158             notifs.addElement(n);
159         }
160     }
161
162     public void removeNotifier(Notification n)
163     {
164         notifs.removeElement(n);
165     }
166
167     public void stop()
168     {
169         /*lock the queue from other threads*/
170         synchronized (mutex)
171         {
172             internalQueue.clear();
173
174             if (disp != null)
175             {
176                 disp.stop();
177                 disp = null;
178             }
179
180             if (channel != null)
181             {
182                 channel.close();
183                 channel = null;
184             }
185
186             stopped = true;
187         }
188     }
189
190     /**
191      * Add the speficied element at the bottom of the queue
192      * @param value
193      */

194     public void add(Object JavaDoc value)
195     {
196         try
197         {
198             Object JavaDoc retval = null;
199
200             RspList rsp = disp.callRemoteMethods(null, "_add", new Object JavaDoc[]{value}, add_signature, GroupRequest.GET_ALL, 0);
201             Vector results = rsp.getResults();
202
203             if (results.size() > 0)
204             {
205                 retval = results.elementAt(0);
206
207                 if (logger.isDebugEnabled())
208                 {
209                     checkResult(rsp, retval);
210                 }
211             }
212         }
213          catch (Exception JavaDoc e)
214         {
215             logger.error("Unable to add value " + value, e);
216         }
217
218         return;
219     }
220
221     /**
222      * Add the speficied element at the top of the queue
223      * @param value
224      */

225     public void addAtHead(Object JavaDoc value)
226     {
227         try
228         {
229             disp.callRemoteMethods(null, "_addAtHead", new Object JavaDoc[]{value}, addAtHead_signature, GroupRequest.GET_ALL, 0);
230         }
231          catch (Exception JavaDoc e)
232         {
233             logger.error("Unable to addAtHead value " + value, e);
234         }
235
236         return;
237     }
238
239     /**
240      * Add the speficied collection to the top of the queue.
241      * Elements are added in the order that they are returned by the specified
242      * collection's iterator.
243      * @param values
244      */

245     public void addAll(Collection values)
246     {
247         try
248         {
249             disp.callRemoteMethods(null, "_addAll", new Object JavaDoc[]{values}, addAll_signature, GroupRequest.GET_ALL, 0);
250         }
251          catch (Exception JavaDoc e)
252         {
253             logger.error("Unable to addAll value: " + values, e);
254         }
255
256         return;
257     }
258
259     public Vector getContents()
260     {
261         Vector result = new Vector();
262
263         for (Iterator e = internalQueue.iterator(); e.hasNext();)
264             result.add(e.next());
265
266         return result;
267     }
268
269     public int size()
270     {
271         return internalQueue.size();
272     }
273
274     /**
275       * returns the first object on the queue, without removing it.
276       * If the queue is empty this object blocks until the first queue object has
277       * been added
278       * @return the first object on the queue
279       */

280     public Object JavaDoc peek()
281     {
282         Object JavaDoc retval = null;
283
284         try
285         {
286             retval = internalQueue.getFirst();
287         }
288          catch (NoSuchElementException e)
289         {
290         }
291
292         return retval;
293     }
294
295     public void reset()
296     {
297         try
298         {
299             disp.callRemoteMethods(null, "_reset", null, reset_signature, GroupRequest.GET_ALL, 0);
300         }
301          catch (Exception JavaDoc e)
302         {
303             logger.error("DistributedQueue.reset(" + groupname + ')', e);
304         }
305     }
306
307     protected void checkResult(RspList rsp, Object JavaDoc retval)
308     {
309         if (logger.isDebugEnabled())
310         {
311             logger.debug("Value updated from " + groupname + " :" + retval);
312         }
313
314         Vector results = rsp.getResults();
315
316         for (int i = 0; i < results.size(); i++)
317         {
318             Object JavaDoc data = results.elementAt(i);
319
320             if (!data.equals(retval))
321             {
322                 logger.error("Reference value differs from returned value " + retval + " != " + data);
323             }
324         }
325     }
326
327     /**
328      * Try to return the first objet in the queue.It does not wait for an object.
329      * @return the first object in the queue or null if none were found.
330      */

331     public Object JavaDoc remove()
332     {
333         Object JavaDoc retval = null;
334         RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout);
335         Vector results = rsp.getResults();
336
337         if (results.size() > 0)
338         {
339             retval = results.elementAt(0);
340
341             if (logger.isDebugEnabled())
342             {
343                 checkResult(rsp, retval);
344             }
345         }
346
347         return retval;
348     }
349
350     /**
351      * @param timeout The time to wait until an entry is retrieved in milliseconds. A value of 0 means wait forever.
352      * @return the first object in the queue or null if none were found
353      */

354     public Object JavaDoc remove(long timeout)
355     {
356         Object JavaDoc retval = null;
357         long start = System.currentTimeMillis();
358
359         if (timeout <= 0)
360         {
361             while (!stopped && (retval == null))
362             {
363                 RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout);
364                 Vector results = rsp.getResults();
365
366                 if (results.size() > 0)
367                 {
368                     retval = results.elementAt(0);
369
370                     if (logger.isDebugEnabled())
371                     {
372                         checkResult(rsp, retval);
373                     }
374                 }
375
376                 if (retval == null)
377                 {
378                     try
379                     {
380                         synchronized (mutex)
381                         {
382                             mutex.wait();
383                         }
384                     }
385                      catch (InterruptedException JavaDoc e)
386                     {
387                     }
388                 }
389             }
390         }
391         else
392         {
393             while (((System.currentTimeMillis() - start) < timeout) && !stopped && (retval == null))
394             {
395                 RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout);
396                 Vector results = rsp.getResults();
397
398                 if (results.size() > 0)
399                 {
400                     retval = results.elementAt(0);
401
402                     if (logger.isDebugEnabled())
403                     {
404                         checkResult(rsp, retval);
405                     }
406                 }
407
408                 if (retval == null)
409                 {
410                     try
411                     {
412                         long delay = timeout - (System.currentTimeMillis() - start);
413
414                         synchronized (mutex)
415                         {
416                             if (delay > 0)
417                             {
418                                 mutex.wait(delay);
419                             }
420                         }
421                     }
422                      catch (InterruptedException JavaDoc e)
423                     {
424                     }
425                 }
426             }
427         }
428
429         return retval;
430     }
431
432     public String JavaDoc toString()
433     {
434         return internalQueue.toString();
435     }
436
437     /*------------------------ Callbacks -----------------------*/
438     public void _add(Object JavaDoc value)
439     {
440         if (logger.isDebugEnabled())
441         {
442             logger.debug(groupname + '@' + getLocalAddress() + " _add(" + value + ')');
443         }
444
445         /*lock the queue from other threads*/
446         synchronized (mutex)
447         {
448             internalQueue.add(value);
449
450             /*wake up all the threads that are waiting for the lock to be released*/
451             mutex.notifyAll();
452         }
453
454         for (int i = 0; i < notifs.size(); i++)
455             ((Notification)notifs.elementAt(i)).entryAdd(value);
456     }
457
458     public void _addAtHead(Object JavaDoc value)
459     {
460         /*lock the queue from other threads*/
461         synchronized (mutex)
462         {
463             internalQueue.addFirst(value);
464
465             /*wake up all the threads that are waiting for the lock to be released*/
466             mutex.notifyAll();
467         }
468
469         for (int i = 0; i < notifs.size(); i++)
470             ((Notification)notifs.elementAt(i)).entryAdd(value);
471     }
472
473     public void _reset()
474     {
475         if (logger.isDebugEnabled())
476         {
477             logger.debug(groupname + '@' + getLocalAddress() + " _reset()");
478         }
479
480         _private_reset();
481
482         for (int i = 0; i < notifs.size(); i++)
483             ((Notification)notifs.elementAt(i)).contentsCleared();
484     }
485
486     protected void _private_reset()
487     {
488         /*lock the queue from other threads*/
489         synchronized (mutex)
490         {
491             internalQueue.clear();
492
493             /*wake up all the threads that are waiting for the lock to be released*/
494             mutex.notifyAll();
495         }
496     }
497
498     public Object JavaDoc _remove()
499     {
500         Object JavaDoc retval = null;
501
502         try
503         {
504             /*lock the queue from other threads*/
505             synchronized (mutex)
506             {
507                 retval = internalQueue.removeFirst();
508
509                 /*wake up all the threads that are waiting for the lock to be released*/
510                 mutex.notifyAll();
511             }
512
513             if (logger.isDebugEnabled())
514             {
515                 logger.debug(groupname + '@' + getLocalAddress() + "_remove(" + retval + ')');
516             }
517
518             for (int i = 0; i < notifs.size(); i++)
519                 ((Notification)notifs.elementAt(i)).entryRemoved(retval);
520         }
521          catch (NoSuchElementException e)
522         {
523             logger.debug(groupname + '@' + getLocalAddress() + "_remove(): nothing to remove");
524         }
525
526         return retval;
527     }
528
529     public void _addAll(Collection c)
530     {
531         if (logger.isDebugEnabled())
532         {
533             logger.debug(groupname + '@' + getLocalAddress() + " _addAll(" + c + ')');
534         }
535
536         /*lock the queue from other threads*/
537         synchronized (mutex)
538         {
539             internalQueue.addAll(c);
540
541             /*wake up all the threads that are waiting for the lock to be released*/
542             mutex.notifyAll();
543         }
544
545         for (int i = 0; i < notifs.size(); i++)
546             ((Notification)notifs.elementAt(i)).contentsSet(c);
547     }
548
549     /*----------------------------------------------------------*/
550     /*-------------------- State Exchange ----------------------*/
551     public void receive(Message msg)
552     {
553     }
554
555     public byte[] getState()
556     {
557         Vector copy = (Vector)getContents().clone();
558
559         try
560         {
561             return Util.objectToByteBuffer(copy);
562         }
563          catch (Throwable JavaDoc ex)
564         {
565             logger.error("DistributedQueue.getState(): exception marshalling state.", ex);
566
567             return null;
568         }
569     }
570
571     public void setState(byte[] new_state)
572     {
573         Vector new_copy;
574
575         try
576         {
577             new_copy = (Vector)Util.objectFromByteBuffer(new_state);
578
579             if (new_copy == null)
580             {
581                 return;
582             }
583         }
584          catch (Throwable JavaDoc ex)
585         {
586             logger.error("DistributedQueue.setState(): exception unmarshalling state.", ex);
587
588             return;
589         }
590
591         _private_reset(); // remove all elements
592
_addAll(new_copy);
593     }
594
595     /*------------------- Membership Changes ----------------------*/
596     public void viewAccepted(View new_view)
597     {
598         Vector new_mbrs = new_view.getMembers();
599
600         if (new_mbrs != null)
601         {
602             sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
603
members.removeAllElements();
604
605             for (int i = 0; i < new_mbrs.size(); i++)
606                 members.addElement(new_mbrs.elementAt(i));
607         }
608     }
609
610     /** Called when a member is suspected */
611     public void suspect(Address suspected_mbr)
612     {
613         ;
614     }
615
616     /** Block sending and receiving of messages until ViewAccepted is called */
617     public void block()
618     {
619     }
620
621     void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs)
622     {
623         Vector joined;
624         Vector left;
625         Object JavaDoc mbr;
626         Notification n;
627
628         if ((notifs.size() == 0) || (old_mbrs == null) || (new_mbrs == null) || (old_mbrs.size() == 0) ||
629                 (new_mbrs.size() == 0))
630         {
631             return;
632         }
633
634         // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
635
joined = new Vector();
636
637         for (int i = 0; i < new_mbrs.size(); i++)
638         {
639             mbr = new_mbrs.elementAt(i);
640
641             if (!old_mbrs.contains(mbr))
642             {
643                 joined.addElement(mbr);
644             }
645         }
646
647         // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
648
left = new Vector();
649
650         for (int i = 0; i < old_mbrs.size(); i++)
651         {
652             mbr = old_mbrs.elementAt(i);
653
654             if (!new_mbrs.contains(mbr))
655             {
656                 left.addElement(mbr);
657             }
658         }
659
660         for (int i = 0; i < notifs.size(); i++)
661         {
662             n = (Notification)notifs.elementAt(i);
663             n.viewChange(joined, left);
664         }
665     }
666
667     void initSignatures()
668     {
669         try
670         {
671             if (add_signature == null)
672             {
673                 add_signature = new Class JavaDoc[] { Object JavaDoc.class };
674             }
675
676             if (addAtHead_signature == null)
677             {
678                 addAtHead_signature = new Class JavaDoc[] { Object JavaDoc.class };
679             }
680
681             if (addAll_signature == null)
682             {
683                 addAll_signature = new Class JavaDoc[] { Collection.class };
684             }
685
686             if (reset_signature == null)
687             {
688                 reset_signature = new Class JavaDoc[0];
689             }
690
691             if (remove_signature == null)
692             {
693                 remove_signature = new Class JavaDoc[0];
694             }
695         }
696          catch (Throwable JavaDoc ex)
697         {
698             logger.error("DistributedQueue.initMethods()", ex);
699         }
700     }
701
702     public static void main(String JavaDoc[] args)
703     {
704         try
705         {
706             // The setup here is kind of weird:
707
// 1. Create a channel
708
// 2. Create a DistributedQueue (on the channel)
709
// 3. Connect the channel (so the HT gets a VIEW_CHANGE)
710
// 4. Start the HT
711
//
712
// A simpler setup is
713
// DistributedQueue ht = new DistributedQueue("demo", null,
714
// "file://c:/JGroups-2.0/conf/total-token.xml", 5000);
715
JChannel c = new JChannel("file:/c:/JGroups-2.0/conf/conf/total-token.xml");
716             c.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
717
718             DistributedQueue ht = new DistributedQueue(c);
719             c.connect("demo");
720             ht.start(5000);
721
722             ht.add("name");
723             ht.add("Michelle Ban");
724
725             Object JavaDoc old_key = ht.remove();
726             System.out.println("old key was " + old_key);
727             old_key = ht.remove();
728             System.out.println("old value was " + old_key);
729
730             ht.add("name 'Michelle Ban'");
731
732             System.out.println("queue is " + ht);
733         }
734          catch (Throwable JavaDoc t)
735         {
736             t.printStackTrace();
737         }
738     }
739 }
740
Popular Tags