KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > DistributedHashtable


1 // $Id: DistributedHashtable.java,v 1.18 2005/02/19 13:23:34 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.*;
8 import org.jgroups.persistence.CannotPersistException;
9 import org.jgroups.persistence.CannotRemoveException;
10 import org.jgroups.persistence.PersistenceFactory;
11 import org.jgroups.persistence.PersistenceManager;
12 import org.jgroups.util.Promise;
13 import org.jgroups.util.Util;
14
15 import java.io.Serializable JavaDoc;
16 import java.util.*;
17
18
19
20
21
22 /**
23  * Provides the abstraction of a java.util.Hashtable that is replicated at several
24  * locations. Any change to the hashtable (clear, put, remove etc) will transparently be
25  * propagated to all replicas in the group. All read-only methods will always access the
26  * local replica.<p>
27  * Both keys and values added to the hashtable <em>must be serializable</em>, the reason
28  * being that they will be sent across the network to all replicas of the group. Having said
29  * this, it is now for example possible to add RMI remote objects to the hashtable as they
30  * are derived from <code>java.rmi.server.RemoteObject</code> which in turn is serializable.
31  * This allows to lookup shared distributed objects by their name and invoke methods on them,
32  * regardless of one's onw location. A <code>DistributedHashtable</code> thus allows to
33  * implement a distributed naming service in just a couple of lines.<p>
34  * An instance of this class will contact an existing member of the group to fetch its
35  * initial state (using the state exchange funclet <code>StateExchangeFunclet</code>.
36  * @author Bela Ban
37  * @author <a HREF="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a>
38  * @version $Id: DistributedHashtable.java,v 1.18 2005/02/19 13:23:34 belaban Exp $
39  * @deprecated This class is unsupported; use JBossCache instead: http://www.jboss.com/products/jbosscache
40  */

41 public class DistributedHashtable extends Hashtable implements MessageListener, MembershipListener {
42
43
44
45     public interface Notification {
46         void entrySet(Object JavaDoc key, Object JavaDoc value);
47         void entryRemoved(Object JavaDoc key);
48         void viewChange(Vector new_mbrs, Vector old_mbrs);
49         void contentsSet(Map new_entries);
50         void contentsCleared();
51     }
52
53
54     private transient Channel channel;
55     protected transient RpcDispatcher disp=null;
56     private transient String JavaDoc groupname=null;
57     private final transient Vector notifs=new Vector(); // to be notified when mbrship changes
58
private final transient Vector members=new Vector(); // keeps track of all DHTs
59
private transient Class JavaDoc[] put_signature=null;
60     private transient Class JavaDoc[] putAll_signature=null;
61     private transient Class JavaDoc[] clear_signature=null;
62     private transient Class JavaDoc[] remove_signature=null;
63     private transient boolean persistent=false; // whether to use PersistenceManager to save state
64
private transient PersistenceManager persistence_mgr=null;
65
66     /** Determines when the updates have to be sent across the network, avoids sending unnecessary
67      * messages when there are no member in the group */

68     private transient boolean send_message = false;
69
70     protected final transient Promise state_promise=new Promise();
71
72     protected final Log log=LogFactory.getLog(this.getClass());
73
74
75
76
77     /**
78      * Creates a DistributedHashtable
79      * @param groupname The name of the group to join
80      * @param factory The ChannelFactory which will be used to create a channel
81      * @param properties The property string to be used to define the channel
82      * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.
83      */

84     public DistributedHashtable(String JavaDoc groupname, ChannelFactory factory,
85                                 String JavaDoc properties, long state_timeout)
86             throws ChannelException {
87         this.groupname=groupname;
88         initSignatures();
89         channel=factory != null ? factory.createChannel(properties) : new JChannel(properties);
90         disp=new RpcDispatcher(channel, this, this, this);
91         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
92         channel.connect(groupname);
93         start(state_timeout);
94     }
95
96     /**
97      * Creates a DisttributedHashtable. Optionally the contents can be saved to
98      * persistemt storage using the {@link PersistenceManager}.
99      * @param groupname Name of the group to join
100      * @param factory Instance of a ChannelFactory to create the channel
101      * @param properties Protocol stack properties
102      * @param persistent Whether the contents should be persisted
103      * @param state_timeout Max number of milliseconds to wait until state is
104      * retrieved
105      */

106     public DistributedHashtable(String JavaDoc groupname, ChannelFactory factory, String JavaDoc properties,
107                                 boolean persistent, long state_timeout)
108             throws ChannelException {
109         this.groupname=groupname;
110         this.persistent=persistent;
111         initSignatures();
112         channel=factory != null ? factory.createChannel(properties) : new JChannel(properties);
113         disp=new RpcDispatcher(channel, this, this, this);
114         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
115         channel.connect(groupname);
116         start(state_timeout);
117     }
118
119
120     public DistributedHashtable(JChannel channel, long state_timeout)
121         throws ChannelNotConnectedException, ChannelClosedException {
122         this(channel, false, state_timeout);
123     }
124
125
126     public DistributedHashtable(JChannel channel, boolean persistent, long state_timeout)
127         throws ChannelNotConnectedException, ChannelClosedException {
128         this.groupname = channel.getChannelName();
129         this.channel = channel;
130         this.persistent=persistent;
131         init(state_timeout);
132     }
133
134     /**
135      * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
136      * used to register under that id. This is typically used when another building block is already using
137      * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
138      * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
139      * first block created on PullPushAdapter.
140      * @param adapter The PullPushAdapter which to use as underlying transport
141      * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
142      * requests/responses for different building blocks on top of PullPushAdapter.
143      * @param state_timeout Max number of milliseconds to wait until state is
144      * retrieved
145      */

146     public DistributedHashtable(PullPushAdapter adapter, Serializable JavaDoc id, long state_timeout)
147         throws ChannelNotConnectedException, ChannelClosedException {
148         initSignatures();
149         this.channel = (Channel)adapter.getTransport();
150         this.groupname = this.channel.getChannelName();
151         disp=new RpcDispatcher(adapter, id, this, this, this);
152         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
153         start(state_timeout);
154     }
155
156     public DistributedHashtable(PullPushAdapter adapter, Serializable JavaDoc id) {
157         initSignatures();
158         this.channel = (Channel)adapter.getTransport();
159         this.groupname = this.channel.getChannelName();
160         disp=new RpcDispatcher(adapter, id, this, this, this);
161         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
162     }
163
164     protected void init(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException {
165         initSignatures();
166         channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
167         disp = new RpcDispatcher(channel, this, this, this);
168
169         // Changed by bela (jan 20 2003): start() has to be called by user (only when providing
170
// own channel). First, Channel.connect() has to be called, then start().
171
// start(state_timeout);
172
}
173
174
175     /**
176      * Fetches the state
177      * @param state_timeout
178      * @throws ChannelClosedException
179      * @throws ChannelNotConnectedException
180      */

181     public void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException {
182         boolean rc;
183         if(persistent) {
184             if(log.isInfoEnabled()) log.info("fetching state from database");
185             try {
186                 persistence_mgr=PersistenceFactory.getInstance().createManager();
187             }
188             catch(Throwable JavaDoc ex) {
189                 if(log.isErrorEnabled()) log.error("failed creating PersistenceManager, " +
190                             "turning persistency off. Exception: " + Util.printStackTrace(ex));
191                 persistent=false;
192             }
193         }
194
195         state_promise.reset();
196         rc=channel.getState(null, state_timeout);
197         if(rc) {
198             if(log.isInfoEnabled()) log.info("state was retrieved successfully, waiting for setState()");
199             Boolean JavaDoc result=(Boolean JavaDoc)state_promise.getResult(state_timeout);
200             if(result == null) {
201                 if(log.isErrorEnabled()) log.error("setState() never got called");
202             }
203             else {
204                 if(log.isInfoEnabled()) log.info("setState() was called");
205             }
206         }
207         else {
208             if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)");
209             if(persistent) {
210                 if(log.isInfoEnabled()) log.info("fetching state from database");
211                 try {
212                     Map m=persistence_mgr.retrieveAll();
213                     if(m != null) {
214                         Map.Entry entry;
215                         Object JavaDoc key, val;
216                         for(Iterator it=m.entrySet().iterator(); it.hasNext();) {
217                             entry=(Map.Entry)it.next();
218                             key=entry.getKey();
219                             val=entry.getValue();
220
221                                 if(log.isInfoEnabled()) log.info("inserting " + key +
222                                            " --> " + val);
223                             put(key, val); // will replicate key and value
224
}
225                     }
226                 }
227                 catch(Throwable JavaDoc ex) {
228                     if(log.isErrorEnabled()) log.error("failed creating PersistenceManager, " +
229                                 "turning persistency off. Exception: " + Util.printStackTrace(ex));
230                     persistent=false;
231                 }
232             }
233         }
234     }
235
236
237     public Address getLocalAddress() {return channel != null ? channel.getLocalAddress() : null;}
238     public String JavaDoc getGroupName() {return groupname;}
239     public Channel getChannel() {return channel;}
240     public boolean getPersistent() {return persistent;}
241     public void setPersistent(boolean p) {persistent=p;}
242
243     public void addNotifier(Notification n) {
244         if(!notifs.contains(n))
245             notifs.addElement(n);
246     }
247
248     public void removeNotifier(Notification n) {
249         if(notifs.contains(n))
250             notifs.removeElement(n);
251     }
252
253     public void stop() {
254         if(disp != null) {
255             disp.stop();
256             disp=null;
257         }
258         if(channel != null) {
259             channel.close();
260             channel=null;
261         }
262     }
263
264
265     /**
266      * Maps the specified key to the specified value in the hashtable. Neither of both parameters can be null
267      * @param key - the hashtable key
268      * @param value - the value
269      * @return the previous value of the specified key in this hashtable, or null if it did not have one
270      */

271     public Object JavaDoc put(Object JavaDoc key, Object JavaDoc value) {
272         Object JavaDoc prev_val=get(key);
273
274         //Changes done by <aos>
275
//if true, propagate action to the group
276
if(send_message == true) {
277             try {
278                 disp.callRemoteMethods(
279                         null, "_put", new Object JavaDoc[]{key,value},
280                         put_signature,
281                         GroupRequest.GET_ALL,
282                         0);
283             }
284             catch(Exception JavaDoc e) {
285                 //return null;
286
}
287         }
288         else {
289             _put(key, value);
290             //don't have to do prev_val = super.put(..) as is done at the beginning
291
}
292         return prev_val;
293     }
294
295     /**
296      * Copies all of the mappings from the specified Map to this Hashtable These mappings will replace any mappings that this Hashtable had for any of the keys currently in the specified Map.
297      * @param m - Mappings to be stored in this map
298      */

299     public void putAll(Map m) {
300         //Changes done by <aos>
301
//if true, propagate action to the group
302
if(send_message == true) {
303             try {
304                 disp.callRemoteMethods(
305                         null, "_putAll", new Object JavaDoc[]{m},
306                         putAll_signature,
307                         GroupRequest.GET_ALL,
308                         0);
309             }
310             catch(Throwable JavaDoc t) {
311             }
312         }
313         else {
314             _putAll(m);
315         }
316     }
317
318     /**
319      * Clears this hashtable so that it contains no keys
320      */

321     public synchronized void clear() {
322         //Changes done by <aos>
323
//if true, propagate action to the group
324
if(send_message == true) {
325             try {
326                 disp.callRemoteMethods(
327                         null, "_clear", null,
328                         clear_signature,
329                         GroupRequest.GET_ALL,
330                         0);
331             }
332             catch(Exception JavaDoc e) {
333                 if(log.isErrorEnabled()) log.error("exception=" + e);
334             }
335         }
336         else {
337             _clear();
338         }
339     }
340
341     /**
342      * Removes the key (and its corresponding value) from the Hashtable.
343      * @param key - the key to be removed.
344      * @return the value to which the key had been mapped in this hashtable, or null if the key did not have a mapping.
345      */

346     public Object JavaDoc remove(Object JavaDoc key) {
347         Object JavaDoc retval = get(key);
348
349         //Changes done by <aos>
350
//if true, propagate action to the group
351
if(send_message == true) {
352             try {
353                 disp.callRemoteMethods(
354                         null, "_remove", new Object JavaDoc[]{key},
355                         remove_signature,
356                         GroupRequest.GET_ALL,
357                         0);
358                 //return retval;
359
}
360             catch(Exception JavaDoc e) {
361                 //return null;
362
}
363         }
364         else {
365             _remove(key);
366             //don't have to do retval = super.remove(..) as is done at the beginning
367
}
368         return retval;
369     }
370
371
372
373     /*------------------------ Callbacks -----------------------*/
374
375     public Object JavaDoc _put(Object JavaDoc key, Object JavaDoc value) {
376         Object JavaDoc retval=super.put(key, value);
377         if(persistent) {
378             try {
379                 persistence_mgr.save((Serializable JavaDoc)key, (Serializable JavaDoc)value);
380             }
381             catch(CannotPersistException cannot_persist_ex) {
382                 if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " +
383                             value + ", exception=" + cannot_persist_ex);
384             }
385             catch(Throwable JavaDoc t) {
386                 if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " +
387                             value + ", exception=" + Util.printStackTrace(t));
388             }
389         }
390         for(int i=0; i < notifs.size(); i++)
391             ((Notification)notifs.elementAt(i)).entrySet(key, value);
392         return retval;
393     }
394
395
396     /**
397      * @see java.util.Map#putAll(java.util.Map)
398      */

399     public void _putAll(Map m) {
400         if (m == null)
401             return;
402
403         // Calling the method below seems okay, but would result in ... deadlock !
404
// The reason is that Map.putAll() calls put(), which we override, which results in
405
// lock contention for the map.
406

407         // ---> super.putAll(m); <--- CULPRIT !!!@#$%$
408

409         // That said let's do it the stupid way:
410
Map.Entry entry;
411         for(Iterator it=m.entrySet().iterator(); it.hasNext();) {
412             entry=(Map.Entry)it.next();
413             super.put(entry.getKey(), entry.getValue());
414         }
415
416         if (persistent) {
417             try {
418                 persistence_mgr.saveAll(m);
419             }
420             catch (CannotPersistException persist_ex) {
421                 if(log.isErrorEnabled()) log.error("failed persisting contents: " + persist_ex);
422             }
423             catch (Throwable JavaDoc t) {
424                 if(log.isErrorEnabled()) log.error("failed persisting contents: " + t);
425             }
426         }
427         for(int i=0; i < notifs.size(); i++)
428             ((Notification)notifs.elementAt(i)).contentsSet(m);
429     }
430
431
432     public void _clear() {
433         super.clear();
434         if(persistent) {
435             try {
436                 persistence_mgr.clear();
437             }
438             catch(CannotRemoveException cannot_remove_ex) {
439                 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex);
440             }
441             catch(Throwable JavaDoc t) {
442                 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t);
443             }
444         }
445         for(int i=0; i < notifs.size(); i++)
446             ((Notification)notifs.elementAt(i)).contentsCleared();
447     }
448
449
450     public Object JavaDoc _remove(Object JavaDoc key) {
451         Object JavaDoc retval=super.remove(key);
452         if(persistent) {
453             try {
454                 persistence_mgr.remove((Serializable JavaDoc)key);
455             }
456             catch(CannotRemoveException cannot_remove_ex) {
457                 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex);
458             }
459             catch(Throwable JavaDoc t) {
460                 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t);
461             }
462         }
463         for(int i=0; i < notifs.size(); i++)
464             ((Notification)notifs.elementAt(i)).entryRemoved(key);
465
466         return retval;
467     }
468
469     /*----------------------------------------------------------*/
470
471
472
473     /*-------------------- State Exchange ----------------------*/
474
475     public void receive(Message msg) { }
476
477     public byte[] getState() {
478         Object JavaDoc key, val;
479         Hashtable copy=new Hashtable();
480
481         for(Enumeration e=keys(); e.hasMoreElements();) {
482             key=e.nextElement();
483             val=get(key);
484             copy.put(key, val);
485         }
486         try {
487             return Util.objectToByteBuffer(copy);
488         }
489         catch(Throwable JavaDoc ex) {
490             if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex);
491             return null;
492         }
493     }
494
495
496     public void setState(byte[] new_state) {
497         Hashtable new_copy;
498
499         try {
500             new_copy=(Hashtable)Util.objectFromByteBuffer(new_state);
501             if(new_copy == null)
502                 return;
503         }
504         catch(Throwable JavaDoc ex) {
505             if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex);
506             return;
507         }
508         _putAll(new_copy);
509         state_promise.setResult(Boolean.TRUE);
510     }
511
512
513
514     /*------------------- Membership Changes ----------------------*/
515
516     public void viewAccepted(View new_view) {
517         Vector new_mbrs=new_view.getMembers();
518
519         if(new_mbrs != null) {
520             sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
521
members.removeAllElements();
522             for(int i=0; i < new_mbrs.size(); i++)
523                 members.addElement(new_mbrs.elementAt(i));
524         }
525         //if size is bigger than one, there are more peers in the group
526
//otherwise there is only one server.
527
if(members.size() > 1) {
528             send_message=true;
529         }
530         else {
531             send_message=false;
532         }
533     }
534
535
536     /** Called when a member is suspected */
537     public void suspect(Address suspected_mbr) {
538         ;
539     }
540
541
542     /** Block sending and receiving of messages until ViewAccepted is called */
543     public void block() {}
544
545
546
547     void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) {
548         Vector joined, left;
549         Object JavaDoc mbr;
550         Notification n;
551
552         if(notifs.size() == 0 || old_mbrs == null || new_mbrs == null ||
553            old_mbrs.size() == 0 || new_mbrs.size() == 0)
554             return;
555
556
557         // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
558
joined=new Vector();
559         for(int i=0; i < new_mbrs.size(); i++) {
560             mbr=new_mbrs.elementAt(i);
561             if(!old_mbrs.contains(mbr))
562                 joined.addElement(mbr);
563         }
564
565
566         // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
567
left=new Vector();
568         for(int i=0; i < old_mbrs.size(); i++) {
569             mbr=old_mbrs.elementAt(i);
570             if(!new_mbrs.contains(mbr)) {
571                 left.addElement(mbr);
572             }
573         }
574
575         for(int i=0; i < notifs.size(); i++) {
576             n=(Notification)notifs.elementAt(i);
577             n.viewChange(joined, left);
578         }
579     }
580
581
582     void initSignatures() {
583         try {
584             if(put_signature == null) {
585                 put_signature=new Class JavaDoc[] {Object JavaDoc.class,Object JavaDoc.class};
586             }
587
588             if(putAll_signature == null) {
589                 putAll_signature=new Class JavaDoc[] {Map.class};
590             }
591
592             if(clear_signature == null)
593                 clear_signature=new Class JavaDoc[0];
594
595             if(remove_signature == null) {
596                 remove_signature=new Class JavaDoc[] {Object JavaDoc.class};
597             }
598         }
599         catch(Throwable JavaDoc ex) {
600             if(log.isErrorEnabled()) log.error("exception=" + ex);
601         }
602     }
603
604     public static void main(String JavaDoc[] args) {
605         try {
606             // The setup here is kind of weird:
607
// 1. Create a channel
608
// 2. Create a DistributedHashtable (on the channel)
609
// 3. Connect the channel (so the HT gets a VIEW_CHANGE)
610
// 4. Start the HT
611
//
612
// A simpler setup is
613
// DistributedHashtable ht = new DistributedHashtable("demo", null,
614
// "file://c:/JGroups-2.0/conf/state_transfer.xml", 5000);
615

616             JChannel c = new JChannel("file:/c:/JGroups-2.0/conf/state_transfer.xml");
617             c.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
618             DistributedHashtable ht = new DistributedHashtable(c, false, 5000);
619             c.connect("demo");
620             ht.start(5000);
621
622
623
624             ht.put("name", "Michelle Ban");
625             Object JavaDoc old_key = ht.remove("name");
626             System.out.println("old key was " + old_key);
627             ht.put("newkey", "newvalue");
628
629             Map m = new HashMap();
630             m.put("k1", "v1");
631             m.put("k2", "v2");
632
633             ht.putAll(m);
634
635             System.out.println("hashmap is " + ht);
636         }
637         catch (Throwable JavaDoc t) {
638             t.printStackTrace();
639         }
640     }
641
642 }
643
644
Popular Tags