KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > DistributedTree


1 // $Id: DistributedTree.java,v 1.12.2.1 2005/05/20 09:53:30 mark_riley99 Exp $
2

3 package org.jgroups.blocks;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.*;
9 import org.jgroups.util.Util;
10
11 import java.io.Serializable JavaDoc;
12 import java.util.StringTokenizer JavaDoc;
13 import java.util.Vector JavaDoc;
14
15
16
17
18 /**
19  * A tree-like structure that is replicated across several members. Updates will be multicast to all group
20  * members reliably and in the same order.
21  * @author Bela Ban
22  * @author <a HREF="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a>
23  * @deprecated This class is unsupported; use JBossCache instead: http://www.jboss.com/products/jbosscache
24  */

25 public class DistributedTree implements MessageListener, MembershipListener {
26     Node root=null;
27     final Vector JavaDoc listeners=new Vector JavaDoc();
28     final Vector JavaDoc view_listeners=new Vector JavaDoc();
29     final Vector JavaDoc members=new Vector JavaDoc();
30     protected Channel channel=null;
31     protected RpcDispatcher disp=null;
32     String JavaDoc groupname="DistributedTreeGroup";
33     String JavaDoc channel_properties="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=0):" +
34             "PING(timeout=5000;num_initial_members=6):" +
35             "FD_SOCK:" +
36             "VERIFY_SUSPECT(timeout=1500):" +
37             "pbcast.STABLE(desired_avg_gossip=10000):" +
38             "pbcast.NAKACK(gc_lag=5;retransmit_timeout=3000;trace=true):" +
39             "UNICAST(timeout=5000):" +
40             "FRAG(down_thread=false;up_thread=false):" +
41             "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
42             "shun=false;print_local_addr=true):" +
43             "pbcast.STATE_TRANSFER(trace=true)";
44     final long state_timeout=5000; // wait 5 secs max to obtain state
45

46     /** Determines when the updates have to be sent across the network, avoids sending unnecessary
47      * messages when there are no member in the group */

48     private boolean send_message = false;
49
50     protected static final Log log=LogFactory.getLog(DistributedTree.class);
51
52
53
54     public interface DistributedTreeListener {
55         void nodeAdded(String JavaDoc fqn, Serializable JavaDoc element);
56
57         void nodeRemoved(String JavaDoc fqn);
58
59         void nodeModified(String JavaDoc fqn, Serializable JavaDoc old_element, Serializable JavaDoc new_element);
60     }
61
62
63     public interface ViewListener {
64         void viewChange(Vector JavaDoc new_mbrs, Vector JavaDoc old_mbrs);
65     }
66
67
68     public DistributedTree() {
69     }
70
71
72     public DistributedTree(String JavaDoc groupname, String JavaDoc channel_properties) {
73         this.groupname=groupname;
74         if(channel_properties != null)
75             this.channel_properties=channel_properties;
76     }
77
78     /*
79      * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
80      * used to register under that id. This is typically used when another building block is already using
81      * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
82      * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
83      * first block created on PullPushAdapter.
84      * @param adapter The PullPushAdapter which to use as underlying transport
85      * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
86      * requests/responses for different building blocks on top of PullPushAdapter.
87      * @param state_timeout Max number of milliseconds to wait until state is
88      * retrieved
89      */

90     public DistributedTree(PullPushAdapter adapter, Serializable JavaDoc id, long state_timeout)
91         throws ChannelException {
92         channel = (Channel)adapter.getTransport();
93         disp=new RpcDispatcher(adapter, id, this, this, this);
94         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
95         boolean rc = channel.getState(null, state_timeout);
96         if(rc) {
97             if(log.isInfoEnabled()) log.info("state was retrieved successfully");
98         }
99         else
100             if(log.isInfoEnabled()) log.info("state could not be retrieved (must be first member in group)");
101     }
102
103     public Object JavaDoc getLocalAddress() {
104         return channel != null? channel.getLocalAddress() : null;
105     }
106
107     public void setDeadlockDetection(boolean flag) {
108         if(disp != null)
109             disp.setDeadlockDetection(flag);
110     }
111
112     public void start() throws Exception JavaDoc {
113         start(8000);
114     }
115
116
117     public void start(long timeout) throws Exception JavaDoc {
118         if(channel != null) // already started
119
return;
120         channel=new JChannel(channel_properties);
121         disp=new RpcDispatcher(channel, this, this, this);
122         channel.connect(groupname);
123         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
124         boolean rc=channel.getState(null, timeout);
125         if(rc) {
126             if(log.isInfoEnabled()) log.info("state was retrieved successfully");
127         }
128         else
129             if(log.isInfoEnabled()) log.info("state could not be retrieved (must be first member in group)");
130     }
131
132
133     public void stop() {
134         if(channel != null) {
135             channel.close();
136             disp.stop();
137         }
138         channel=null;
139         disp=null;
140     }
141
142
143     public void addDistributedTreeListener(DistributedTreeListener listener) {
144         if(!listeners.contains(listener))
145             listeners.addElement(listener);
146     }
147
148
149     public void removeDistributedTreeListener(DistributedTreeListener listener) {
150         listeners.removeElement(listener);
151     }
152
153
154     public void addViewListener(ViewListener listener) {
155         if(!view_listeners.contains(listener))
156             view_listeners.addElement(listener);
157     }
158
159
160     public void removeViewListener(ViewListener listener) {
161         view_listeners.removeElement(listener);
162     }
163
164
165     public void add(String JavaDoc fqn) {
166         //Changes done by <aos>
167
//if true, propagate action to the group
168
if(send_message == true) {
169             try {
170                 MethodCall call = new MethodCall("_add", new Object JavaDoc[] {fqn}, new String JavaDoc[] {String JavaDoc.class.getName()});
171                 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, 0);
172             }
173             catch(Exception JavaDoc ex) {
174                 if(log.isErrorEnabled()) log.error("exception=" + ex);
175             }
176         }
177         else {
178             _add(fqn);
179         }
180     }
181
182     public void add(String JavaDoc fqn, Serializable JavaDoc element) {
183         add(fqn, element, 0);
184     }
185
186     /** resets an existing node, useful after a merge when you want to tell other
187      * members of your state, but do not wish to remove and then add as two separate calls */

188     public void reset(String JavaDoc fqn, Serializable JavaDoc element)
189     {
190         reset(fqn, element, 0);
191     }
192
193     public void remove(String JavaDoc fqn) {
194         remove(fqn, 0);
195     }
196
197     public void add(String JavaDoc fqn, Serializable JavaDoc element, int timeout) {
198         //Changes done by <aos>
199
//if true, propagate action to the group
200
if(send_message == true) {
201             try {
202                 MethodCall call = new MethodCall("_add", new Object JavaDoc[] {fqn, element},
203                     new String JavaDoc[] {String JavaDoc.class.getName(), Serializable JavaDoc.class.getName()});
204                 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);
205             }
206             catch(Exception JavaDoc ex) {
207                 if(log.isErrorEnabled()) log.error("exception=" + ex);
208             }
209         }
210         else {
211             _add(fqn, element);
212         }
213     }
214
215     /** resets an existing node, useful after a merge when you want to tell other
216      * members of your state, but do not wish to remove and then add as two separate calls */

217     public void reset(String JavaDoc fqn, Serializable JavaDoc element, int timeout)
218     {
219         //Changes done by <aos>
220
//if true, propagate action to the group
221
if(send_message == true) {
222             try {
223                 MethodCall call = new MethodCall("_reset", new Object JavaDoc[] {fqn, element},
224                     new String JavaDoc[] {String JavaDoc.class.getName(), Serializable JavaDoc.class.getName()});
225                 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);
226             }
227             catch(Exception JavaDoc ex) {
228                 if(log.isErrorEnabled()) log.error("exception=" + ex);
229             }
230         }
231         else {
232             _add(fqn, element);
233         }
234     }
235
236     public void remove(String JavaDoc fqn, int timeout) {
237         //Changes done by <aos>
238
//if true, propagate action to the group
239
if(send_message == true) {
240             try {
241                 MethodCall call = new MethodCall("_remove", new Object JavaDoc[] {fqn}, new String JavaDoc[] {String JavaDoc.class.getName()});
242                 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);
243             }
244             catch(Exception JavaDoc ex) {
245                 if(log.isErrorEnabled()) log.error("exception=" + ex);
246             }
247         }
248         else {
249             _remove(fqn);
250         }
251     }
252
253
254     public boolean exists(String JavaDoc fqn) {
255         if(fqn == null)
256             return false;
257         return findNode(fqn) == null? false : true;
258     }
259
260
261     public Serializable JavaDoc get(String JavaDoc fqn) {
262         Node n=null;
263
264         if(fqn == null) return null;
265         n=findNode(fqn);
266         if(n != null) {
267             return n.element;
268         }
269         return null;
270     }
271
272
273     public void set(String JavaDoc fqn, Serializable JavaDoc element) {
274         set(fqn, element, 0);
275     }
276
277     public void set(String JavaDoc fqn, Serializable JavaDoc element, int timeout) {
278         //Changes done by <aos>
279
//if true, propagate action to the group
280
if(send_message == true) {
281             try {
282                 MethodCall call = new MethodCall("_set", new Object JavaDoc[] {fqn, element},
283                     new String JavaDoc[] {String JavaDoc.class.getName(), Serializable JavaDoc.class.getName()});
284                 disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);
285             }
286             catch(Exception JavaDoc ex) {
287                 if(log.isErrorEnabled()) log.error("exception=" + ex);
288             }
289         }
290         else {
291             _set(fqn, element);
292         }
293     }
294
295
296     /** Returns all children of a Node as strings */
297     public Vector JavaDoc getChildrenNames(String JavaDoc fqn) {
298         Vector JavaDoc ret=new Vector JavaDoc();
299         Node n;
300
301         if(fqn == null) return ret;
302         n=findNode(fqn);
303         if(n == null || n.children == null) return ret;
304         for(int i=0; i < n.children.size(); i++)
305             ret.addElement(((Node)n.children.elementAt(i)).name);
306         return ret;
307     }
308
309
310     public String JavaDoc print() {
311         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
312         int indent=0;
313
314         if(root == null)
315             return "/";
316
317         sb.append(root.print(indent));
318         return sb.toString();
319     }
320
321
322     /** Returns all children of a Node as Nodes */
323     Vector JavaDoc getChildren(String JavaDoc fqn) {
324         Node n;
325
326         if(fqn == null) return null;
327         n=findNode(fqn);
328         if(n == null) return null;
329         return n.children;
330     }
331
332     /**
333      * Returns the name of the group that the DistributedTree is connected to
334      * @return String
335      */

336     public String JavaDoc getGroupName() {return groupname;}
337         
338     /**
339      * Returns the Channel the DistributedTree is connected to
340      * @return Channel
341      */

342     public Channel getChannel() {return channel;}
343
344    /**
345      * Returns the number of current members joined to the group
346      * @return int
347      */

348     public int getGroupMembersNumber() {return members.size();}
349
350
351
352
353     /*--------------------- Callbacks --------------------------*/
354
355     public void _add(String JavaDoc fqn) {
356         _add(fqn, null);
357     }
358
359
360     public void _add(String JavaDoc fqn, Serializable JavaDoc element) {
361         Node curr, n;
362         StringTokenizer JavaDoc tok;
363         String JavaDoc child_name;
364         String JavaDoc tmp_fqn="";
365
366         if(root == null) {
367             root=new Node("/", null);
368             notifyNodeAdded("/", null);
369         }
370         if(fqn == null)
371             return;
372         curr=root;
373         tok=new StringTokenizer JavaDoc(fqn, "/");
374
375         while(tok.hasMoreTokens()) {
376             child_name=tok.nextToken();
377             tmp_fqn=tmp_fqn + '/' + child_name;
378             n=curr.findChild(child_name);
379             if(n == null) {
380                 n=new Node(child_name, null);
381                 curr.addChild(n);
382                 if(!tok.hasMoreTokens()) {
383                     n.element=element;
384                     notifyNodeAdded(tmp_fqn, element);
385                     return;
386                 }
387                 else
388                     notifyNodeAdded(tmp_fqn, null);
389             }
390             curr=n;
391         }
392         curr.element=element;
393         notifyNodeModified(fqn, null, element);
394     }
395
396
397     public void _remove(String JavaDoc fqn) {
398         Node curr, n;
399         StringTokenizer JavaDoc tok;
400         String JavaDoc child_name=null;
401
402         if(fqn == null || root == null)
403             return;
404         curr=root;
405         tok=new StringTokenizer JavaDoc(fqn, "/");
406
407         while(tok.countTokens() > 1) {
408             child_name=tok.nextToken();
409             n=curr.findChild(child_name);
410             if(n == null) // node does not exist
411
return;
412             curr=n;
413         }
414         try {
415             child_name=tok.nextToken();
416             if(child_name != null) {
417                 n=curr.removeChild(child_name);
418                 if(n != null)
419                     notifyNodeRemoved(fqn);
420             }
421         }
422         catch(Exception JavaDoc ex) {
423         }
424     }
425
426
427     public void _set(String JavaDoc fqn, Serializable JavaDoc element) {
428         Node n;
429         Serializable JavaDoc old_el=null;
430
431         if(fqn == null || element == null) return;
432         n=findNode(fqn);
433         if(n == null) {
434             if(log.isErrorEnabled()) log.error("node " + fqn + " not found");
435             return;
436         }
437         old_el=n.element;
438         n.element=element;
439         notifyNodeModified(fqn, old_el, element);
440     }
441
442     /** similar to set, but does not error if node does not exist, but rather does an add instead */
443     public void _reset(String JavaDoc fqn, Serializable JavaDoc element) {
444         Node n;
445         Serializable JavaDoc old_el=null;
446
447         if(fqn == null || element == null) return;
448         n=findNode(fqn);
449         if(n == null) {
450             _add(fqn, element);
451         }
452         old_el=n.element;
453         n.element=element;
454         notifyNodeModified(fqn, old_el, element);
455     }
456
457     /*----------------- End of Callbacks ----------------------*/
458
459
460
461
462
463
464     /*-------------------- State Exchange ----------------------*/
465
466     public void receive(Message msg) {
467     }
468
469     /** Return a copy of the tree */
470     public byte[] getState() {
471         Object JavaDoc copy=root != null? root.copy() : null;
472         try {
473             return Util.objectToByteBuffer(copy);
474         }
475         catch(Throwable JavaDoc ex) {
476             if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex);
477             return null;
478         }
479     }
480
481     public void setState(byte[] data) {
482         Object JavaDoc new_state;
483
484         try {
485             new_state=Util.objectFromByteBuffer(data);
486         }
487         catch(Throwable JavaDoc ex) {
488             if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex);
489             return;
490         }
491         if(new_state == null) return;
492         if(!(new_state instanceof Node)) {
493             if(log.isErrorEnabled()) log.error("object is not of type 'Node'");
494             return;
495         }
496         root=((Node)new_state).copy();
497     }
498
499
500
501     /*------------------- Membership Changes ----------------------*/
502
503     public void viewAccepted(View new_view) {
504         Vector JavaDoc new_mbrs=new_view.getMembers();
505
506         if(new_mbrs != null) {
507             sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
508
members.removeAllElements();
509             for(int i=0; i < new_mbrs.size(); i++)
510                 members.addElement(new_mbrs.elementAt(i));
511         }
512         //if size is bigger than one, there are more peers in the group
513
//otherwise there is only one server.
514
if(members.size() > 1) {
515             send_message=true;
516         }
517         else {
518             send_message=false;
519         }
520     }
521
522
523     /** Called when a member is suspected */
524     public void suspect(Address suspected_mbr) {
525     }
526
527
528     /** Block sending and receiving of messages until ViewAccepted is called */
529     public void block() {
530     }
531
532
533     void sendViewChangeNotifications(Vector JavaDoc new_mbrs, Vector JavaDoc old_mbrs) {
534         Vector JavaDoc joined, left;
535         Object JavaDoc mbr;
536
537         if(view_listeners.size() == 0 || old_mbrs == null || new_mbrs == null)
538             return;
539
540
541         // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
542
joined=new Vector JavaDoc();
543         for(int i=0; i < new_mbrs.size(); i++) {
544             mbr=new_mbrs.elementAt(i);
545             if(!old_mbrs.contains(mbr))
546                 joined.addElement(mbr);
547         }
548
549
550         // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
551
left=new Vector JavaDoc();
552         for(int i=0; i < old_mbrs.size(); i++) {
553             mbr=old_mbrs.elementAt(i);
554             if(!new_mbrs.contains(mbr))
555                 left.addElement(mbr);
556         }
557         notifyViewChange(joined, left);
558     }
559
560
561     Node findNode(String JavaDoc fqn) {
562         Node curr=root;
563         StringTokenizer JavaDoc tok;
564         String JavaDoc child_name;
565
566         if(fqn == null || root == null) return null;
567         if("/".equals(fqn) || "".equals(fqn))
568             return root;
569
570         tok=new StringTokenizer JavaDoc(fqn, "/");
571         while(tok.hasMoreTokens()) {
572             child_name=tok.nextToken();
573             curr=curr.findChild(child_name);
574             if(curr == null) return null;
575         }
576         return curr;
577     }
578
579
580     void notifyNodeAdded(String JavaDoc fqn, Serializable JavaDoc element) {
581         for(int i=0; i < listeners.size(); i++)
582             ((DistributedTreeListener)listeners.elementAt(i)).nodeAdded(fqn, element);
583     }
584
585     void notifyNodeRemoved(String JavaDoc fqn) {
586         for(int i=0; i < listeners.size(); i++)
587             ((DistributedTreeListener)listeners.elementAt(i)).nodeRemoved(fqn);
588     }
589
590     void notifyNodeModified(String JavaDoc fqn, Serializable JavaDoc old_element, Serializable JavaDoc new_element) {
591         for(int i=0; i < listeners.size(); i++)
592             ((DistributedTreeListener)listeners.elementAt(i)).nodeModified(fqn, old_element, new_element);
593     }
594
595     /** Generates NodeAdded notifications for all nodes of the tree. This is called whenever the tree is
596      initially retrieved (state transfer) */

597     void notifyAllNodesCreated(Node curr, String JavaDoc tmp_fqn) {
598         Node n;
599
600         if(curr == null) return;
601         if(curr.name == null) {
602             if(log.isErrorEnabled()) log.error("curr.name is null");
603             return;
604         }
605
606         if(curr.children != null) {
607             for(int i=0; i < curr.children.size(); i++) {
608                 n=(Node)curr.children.elementAt(i);
609                 System.out.println("*** nodeCreated(): tmp_fqn is " + tmp_fqn);
610                 notifyNodeAdded(tmp_fqn, n.element);
611                 notifyAllNodesCreated(n, tmp_fqn + '/' + n.name);
612             }
613         }
614     }
615
616
617     void notifyViewChange(Vector JavaDoc new_mbrs, Vector JavaDoc old_mbrs) {
618         for(int i=0; i < view_listeners.size(); i++)
619             ((ViewListener)view_listeners.elementAt(i)).viewChange(new_mbrs, old_mbrs);
620     }
621
622
623     private static class Node implements Serializable JavaDoc {
624         String JavaDoc name=null;
625         Vector JavaDoc children=null;
626         Serializable JavaDoc element=null;
627
628
629         Node() {
630         }
631
632         Node(String JavaDoc name, Serializable JavaDoc element) {
633             this.name=name;
634             this.element=element;
635         }
636
637
638         void addChild(String JavaDoc relative_name, Serializable JavaDoc element) {
639             if(relative_name == null)
640                 return;
641             if(children == null)
642                 children=new Vector JavaDoc();
643             else {
644                 if(!children.contains(relative_name))
645                     children.addElement(new Node(relative_name, element));
646             }
647         }
648
649
650         void addChild(Node n) {
651             if(n == null) return;
652             if(children == null)
653                 children=new Vector JavaDoc();
654             if(!children.contains(n))
655                 children.addElement(n);
656         }
657
658
659         Node removeChild(String JavaDoc rel_name) {
660             Node n=findChild(rel_name);
661
662             if(n != null)
663                 children.removeElement(n);
664             return n;
665         }
666
667
668         Node findChild(String JavaDoc relative_name) {
669             Node child;
670
671             if(children == null || relative_name == null)
672                 return null;
673             for(int i=0; i < children.size(); i++) {
674                 child=(Node)children.elementAt(i);
675                 if(child.name == null) {
676                     if(log.isErrorEnabled()) log.error("child.name is null for " + relative_name);
677                     continue;
678                 }
679
680                 if(child.name.equals(relative_name))
681                     return child;
682             }
683
684             return null;
685         }
686
687
688         public boolean equals(Object JavaDoc other) {
689             return other != null && ((Node)other).name != null && name != null && name.equals(((Node)other).name);
690         }
691
692
693         Node copy() {
694             Node ret=new Node(name, element);
695
696             if(children != null)
697                 ret.children=(Vector JavaDoc)children.clone();
698             return ret;
699         }
700
701
702         String JavaDoc print(int indent) {
703             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
704             boolean is_root=name != null && "/".equals(name);
705
706             for(int i=0; i < indent; i++)
707                 sb.append(' ');
708             if(!is_root) {
709                 if(name == null)
710                     sb.append("/<unnamed>");
711                 else {
712                     sb.append('/' + name);
713                     // if(element != null) sb.append(" --> " + element);
714
}
715             }
716             sb.append('\n');
717             if(children != null) {
718                 if(is_root)
719                     indent=0;
720                 else
721                     indent+=4;
722                 for(int i=0; i < children.size(); i++)
723                     sb.append(((Node)children.elementAt(i)).print(indent));
724             }
725             return sb.toString();
726         }
727
728
729         public String JavaDoc toString() {
730             if(element != null)
731                 return "[name: " + name + ", element: " + element + ']';
732             else
733                 return "[name: " + name + ']';
734         }
735
736     }
737
738
739 }
740
741
742
Popular Tags