KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > ReplicatedHashMap


1 package org.jgroups.blocks;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jgroups.*;
6 import org.jgroups.persistence.CannotPersistException;
7 import org.jgroups.persistence.CannotRemoveException;
8 import org.jgroups.persistence.PersistenceFactory;
9 import org.jgroups.persistence.PersistenceManager;
10 import org.jgroups.util.Promise;
11 import org.jgroups.util.Util;
12
13 import java.io.*;
14 import java.util.HashMap JavaDoc;
15 import java.util.Map JavaDoc;
16 import java.util.Vector JavaDoc;
17 import java.lang.reflect.Method JavaDoc;
18
19
20 /**
21  * Provides the abstraction of a java.util.HashMap that is replicated across a cluster.
22  * Any change to the hashmap (clear(), put(), remove() etc) will transparently be
23  * propagated to all replicas in the group. All read-only methods will always access the local replica.<p>
24  * Both keys and values added to the hashmap <em>must be serializable</em>, the reason
25  * being that they will be sent across the network to all replicas of the group. Having said
26  * this, it is now for example possible to add RMI remote objects to the hashtable as they
27  * are derived from <code>java.rmi.server.RemoteObject</code> which in turn is serializable.
28  * This allows to lookup shared distributed objects by their name and invoke methods on them,
29  * regardless of one's onw location. A <code>DistributedHashMap</code> thus allows to
30  * implement a distributed naming service in just a couple of lines.<p>
31  * An instance of this class will contact an existing member of the group to fetch its
32  * initial state (using the state exchange funclet <code>StateExchangeFunclet</code>.<p>
33  * This class combines both {@link org.jgroups.blocks.ReplicatedHashtable} (asynchronous replication) and
34  * {@link org.jgroups.blocks.DistributedHashtable} (synchronous replication) into one class
35  * @author Bela Ban
36  * @version $Id: ReplicatedHashMap.java,v 1.4 2007/07/06 07:44:40 belaban Exp $
37  */

38 public class ReplicatedHashMap<K extends Serializable,V extends Serializable> extends HashMap JavaDoc<K,V> implements ExtendedMessageListener, ExtendedMembershipListener {
39
40
41
42
43     public interface Notification {
44         void entrySet(Object JavaDoc key, Object JavaDoc value);
45
46         void entryRemoved(Object JavaDoc key);
47
48         void viewChange(View view, Vector JavaDoc<Address> new_mbrs, Vector JavaDoc<Address> old_mbrs);
49
50         void contentsSet(Map JavaDoc new_entries);
51
52         void contentsCleared();
53     }
54
55     protected static Map JavaDoc<Short JavaDoc, Method JavaDoc> methods;
56
57     static {
58         try {
59             methods=new HashMap JavaDoc<Short JavaDoc,Method JavaDoc>(10);
60             methods.put(new Short JavaDoc((short)1), ReplicatedHashMap.class.getMethod("_put", Serializable.class, Serializable.class));
61             methods.put(new Short JavaDoc((short)2), ReplicatedHashMap.class.getMethod("_putAll", Map JavaDoc.class));
62             methods.put(new Short JavaDoc((short)3), ReplicatedHashMap.class.getMethod("_remove", Object JavaDoc.class));
63             methods.put(new Short JavaDoc((short)4), ReplicatedHashMap.class.getMethod("_clear"));
64         }
65         catch(NoSuchMethodException JavaDoc e) {
66             throw new RuntimeException JavaDoc(e);
67         }
68     }
69
70     private transient Channel channel;
71     protected transient RpcDispatcher disp=null;
72     private String JavaDoc cluster_name=null;
73     private final transient Vector JavaDoc<Notification> notifs=new Vector JavaDoc<Notification>(); // to be notified when mbrship changes
74
private final Vector JavaDoc<Address> members=new Vector JavaDoc<Address>(); // keeps track of all DHTs
75
private transient boolean persistent=false; // whether to use PersistenceManager to save state
76
private transient PersistenceManager persistence_mgr=null;
77
78     /**
79      * Determines when the updates have to be sent across the network, avoids sending unnecessary
80      * messages when there are no member in the group
81      */

82     private transient boolean send_message=false;
83
84     protected final transient Promise state_promise=new Promise();
85
86     /**
87      * Whether updates across the cluster should be asynchronous (default) or synchronous)
88      */

89     protected int update_mode=GroupRequest.GET_NONE;
90
91     /**
92      * For blocking updates only: the max time to wait (0 == forever)
93      */

94     protected long timeout=5000;
95
96     protected final Log log=LogFactory.getLog(this.getClass());
97
98
99     /**
100      * Creates a ReplicatedHashMap
101      * @param clustername The name of the group to join
102      * @param factory The ChannelFactory which will be used to create a channel
103      * @param properties The property string to be used to define the channel. This will override the properties of
104      * the factory. If null, then the factory properties will be used
105      * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.
106      */

107     public ReplicatedHashMap(String JavaDoc clustername, ChannelFactory factory, String JavaDoc properties, long state_timeout)
108             throws ChannelException {
109         this.cluster_name=clustername;
110         if(factory != null) {
111             channel=properties != null? factory.createChannel(properties) : factory.createChannel();
112         }
113         else {
114             channel=new JChannel(properties);
115         }
116         disp=new RpcDispatcher(channel, this, this, this);
117         disp.setMethodLookup(new MethodLookup() {
118             public Method JavaDoc findMethod(short id) {
119                 return methods.get(id);
120             }
121         });
122         channel.connect(clustername);
123         start(state_timeout);
124     }
125
126     /**
127      * Creates a ReplicatedHashMap. Optionally the contents can be saved to
128      * persistemt storage using the {@link org.jgroups.persistence.PersistenceManager}.
129      * @param clustername Name of the group to join
130      * @param factory Instance of a ChannelFactory to create the channel
131      * @param properties Protocol stack properties. This will override the properties of the factory. If
132      * null, then the factory properties will be used
133      * @param persistent Whether the contents should be persisted
134      * @param state_timeout Max number of milliseconds to wait until state is
135      * retrieved
136      */

137     public ReplicatedHashMap(String JavaDoc clustername, ChannelFactory factory, String JavaDoc properties,
138                              boolean persistent, long state_timeout)
139             throws ChannelException {
140         this.cluster_name=clustername;
141         this.persistent=persistent;
142         if(factory != null) {
143             channel=properties != null? factory.createChannel(properties) : factory.createChannel();
144         }
145         else {
146             channel=new JChannel(properties);
147         }
148         disp=new RpcDispatcher(channel, this, this, this);
149         disp.setMethodLookup(new MethodLookup() {
150             public Method JavaDoc findMethod(short id) {
151                 return methods.get(id);
152             }
153         });
154         channel.connect(clustername);
155         start(state_timeout);
156     }
157
158
159     public ReplicatedHashMap(Channel channel, long state_timeout) {
160         this(channel, false, state_timeout);
161     }
162
163
164     public ReplicatedHashMap(Channel channel, boolean persistent, long state_timeout) {
165         this.cluster_name=channel.getClusterName();
166         this.channel=channel;
167         this.persistent=persistent;
168         init(state_timeout);
169     }
170
171
172     protected final void init(long state_timeout) {
173         disp=new RpcDispatcher(channel, this, this, this);
174         disp.setMethodLookup(new MethodLookup() {
175             public Method JavaDoc findMethod(short id) {
176                 return methods.get(id);
177             }
178         });
179
180         // Changed by bela (jan 20 2003): start() has to be called by user (only when providing
181
// own channel). First, Channel.connect() has to be called, then start().
182
// start(state_timeout);
183
}
184
185
186     public boolean isBlockingUpdates() {
187         return update_mode == GroupRequest.GET_ALL;
188     }
189
190     /**
191      * Whether updates across the cluster should be asynchronous (default) or synchronous)
192      * @param blocking_updates
193      */

194     public void setBlockingUpdates(boolean blocking_updates) {
195         this.update_mode=blocking_updates? GroupRequest.GET_ALL : GroupRequest.GET_NONE;
196     }
197
198     /**
199      * The timeout (in milliseconds) for blocking updates
200      */

201     public long getTimeout() {
202         return timeout;
203     }
204
205     /**
206      * Sets the cluster call timeout (until all acks have been received)
207      * @param timeout The timeout (in milliseconds) for blocking updates
208      */

209     public void setTimeout(long timeout) {
210         this.timeout=timeout;
211     }
212
213     /**
214      * Fetches the state
215      * @param state_timeout
216      * @throws org.jgroups.ChannelClosedException
217      *
218      * @throws org.jgroups.ChannelNotConnectedException
219      *
220      */

221     public final void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException {
222         boolean rc;
223         if(persistent) {
224             if(log.isInfoEnabled()) log.info("fetching state from database");
225             try {
226                 persistence_mgr=PersistenceFactory.getInstance().createManager();
227             }
228             catch(Throwable JavaDoc ex) {
229                 if(log.isErrorEnabled()) log.error("failed creating PersistenceManager, " +
230                         "turning persistency off. Exception: " + Util.printStackTrace(ex));
231                 persistent=false;
232             }
233         }
234
235         state_promise.reset();
236         rc=channel.getState(null, state_timeout);
237         if(rc) {
238             if(log.isInfoEnabled()) log.info("state was retrieved successfully, waiting for setState()");
239             Boolean JavaDoc result=(Boolean JavaDoc)state_promise.getResult(state_timeout);
240             if(result == null) {
241                 if(log.isErrorEnabled()) log.error("setState() never got called");
242             }
243             else {
244                 if(log.isInfoEnabled()) log.info("setState() was called");
245             }
246         }
247         else {
248             if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)");
249             if(persistent) {
250                 if(log.isInfoEnabled()) log.info("fetching state from database");
251                 try {
252                     Map JavaDoc<K,V> m=persistence_mgr.retrieveAll();
253                     if(m != null) {
254                         K key;
255                         V val;
256                         for(Map.Entry JavaDoc<K,V> entry: m.entrySet()) {
257                             key=entry.getKey();
258                             val=entry.getValue();
259                             if(log.isTraceEnabled()) log.trace("inserting " + key + " --> " + val);
260                             put(key, val); // will replicate key and value
261
}
262                     }
263                 }
264                 catch(Throwable JavaDoc ex) {
265                     if(log.isErrorEnabled()) log.error("failed creating PersistenceManager, " +
266                             "turning persistency off. Exception: " + Util.printStackTrace(ex));
267                     persistent=false;
268                 }
269             }
270         }
271     }
272
273
274     public Address getLocalAddress() {
275         return channel != null? channel.getLocalAddress() : null;
276     }
277
278     public String JavaDoc getClusterName() {
279         return cluster_name;
280     }
281
282     public Channel getChannel() {
283         return channel;
284     }
285
286     public boolean getPersistent() {
287         return persistent;
288     }
289
290     public void setPersistent(boolean p) {
291         persistent=p;
292     }
293
294
295     public void setDeadlockDetection(boolean flag) {
296         if(disp != null)
297             disp.setDeadlockDetection(flag);
298     }
299
300     public void addNotifier(Notification n) {
301         if(!notifs.contains(n))
302             notifs.addElement(n);
303     }
304
305     public void removeNotifier(Notification n) {
306         if(notifs.contains(n))
307             notifs.removeElement(n);
308     }
309
310     public void stop() {
311         if(disp != null) {
312             disp.stop();
313             disp=null;
314         }
315         if(channel != null) {
316             channel.close();
317             channel=null;
318         }
319     }
320
321
322
323
324     /**
325      * Maps the specified key to the specified value in the hashtable. Neither of both parameters can be null
326      * @param key - the hashtable key
327      * @param value - the value
328      * @return the previous value of the specified key in this hashtable, or null if it did not have one
329      */

330     public V put(K key, V value) {
331         V prev_val=get(key);
332
333         if(send_message == true) {
334             try {
335                 MethodCall call=new MethodCall((short)1, new Object JavaDoc[]{key, value});
336                 disp.callRemoteMethods(null, call, update_mode, timeout);
337             }
338             catch(Exception JavaDoc e) {
339                 throw new RuntimeException JavaDoc("put(" + key + ", " + value + ") failed", e);
340             }
341         }
342         else {
343             _put(key, value);
344         }
345         return prev_val;
346     }
347
348
349     /**
350      * Copies all of the mappings from the specified Map to this hashmap.
351      * These mappings will replace any mappings that this hashmap had for any of the keys currently in the specified Map.
352      * @param m - Mappings to be stored in this map
353      */

354     public void putAll(Map JavaDoc<? extends K, ? extends V> m) {
355         if(send_message == true) {
356             try {
357                 MethodCall call=new MethodCall((short)2, new Object JavaDoc[]{m});
358                 disp.callRemoteMethods(null, call, update_mode, timeout);
359             }
360             catch(Throwable JavaDoc t) {
361                 throw new RuntimeException JavaDoc("putAll() failed", t);
362             }
363         }
364         else {
365             _putAll(m);
366         }
367     }
368
369     /**
370      * Clears this hashtable so that it contains no keys
371      */

372     public void clear() {
373         //Changes done by <aos>
374
//if true, propagate action to the group
375
if(send_message == true) {
376             try {
377                 MethodCall call=new MethodCall((short)4, null);
378                 disp.callRemoteMethods(null, call, update_mode, timeout);
379             }
380             catch(Exception JavaDoc e) {
381                 throw new RuntimeException JavaDoc("clear() failed", e);
382             }
383         }
384         else {
385             _clear();
386         }
387     }
388
389     /**
390      * Removes the key (and its corresponding value) from the hashmap.
391      * @param key - the key to be removed.
392      * @return the value to which the key had been mapped in this hashtable, or null if the key did not have a mapping.
393      */

394     public V remove(Object JavaDoc key) {
395         V retval=get(key);
396
397         //Changes done by <aos>
398
//if true, propagate action to the group
399
if(send_message == true) {
400             try {
401                 MethodCall call=new MethodCall((short)3, new Object JavaDoc[]{key});
402                 disp.callRemoteMethods(null, call, update_mode, timeout);
403             }
404             catch(Exception JavaDoc e) {
405                  throw new RuntimeException JavaDoc("remove(" + key + ") failed", e);
406             }
407         }
408         else {
409             _remove(key);
410             //don't have to do retval = super.remove(..) as is done at the beginning
411
}
412         return retval;
413     }
414
415
416
417     /*------------------------ Callbacks -----------------------*/
418
419     public V _put(K key, V value) {
420         V retval=super.put(key, value);
421         if(persistent) {
422             try {
423                 persistence_mgr.save(key, value);
424             }
425             catch(CannotPersistException cannot_persist_ex) {
426                 if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " +
427                         value + ", exception=" + cannot_persist_ex);
428             }
429             catch(Throwable JavaDoc t) {
430                 if(log.isErrorEnabled()) log.error("failed persisting " + key + " + " +
431                         value + ", exception=" + Util.printStackTrace(t));
432             }
433         }
434         for(int i=0; i < notifs.size(); i++)
435             notifs.elementAt(i).entrySet(key, value);
436         return retval;
437     }
438
439
440     /**
441      * @see java.util.Map#putAll(java.util.Map)
442      */

443     public void _putAll(Map JavaDoc<? extends K, ? extends V> map) {
444         if(map == null)
445             return;
446
447         // Calling the method below seems okay, but would result in ... deadlock !
448
// The reason is that Map.putAll() calls put(), which we override, which results in
449
// lock contention for the map.
450

451         // ---> super.putAll(m); <--- CULPRIT !!!@#$%$
452

453         // That said let's do it the stupid way:
454
for(Map.Entry JavaDoc<? extends K,? extends V> entry: map.entrySet()) {
455             super.put(entry.getKey(), entry.getValue());
456         }
457
458         if(persistent) {
459             try {
460                 persistence_mgr.saveAll(map);
461             }
462             catch(CannotPersistException persist_ex) {
463                 if(log.isErrorEnabled()) log.error("failed persisting contents: " + persist_ex);
464             }
465             catch(Throwable JavaDoc t) {
466                 if(log.isErrorEnabled()) log.error("failed persisting contents: " + t);
467             }
468         }
469         for(int i=0; i < notifs.size(); i++)
470             notifs.elementAt(i).contentsSet(map);
471     }
472
473
474     public void _clear() {
475         super.clear();
476         if(persistent) {
477             try {
478                 persistence_mgr.clear();
479             }
480             catch(CannotRemoveException cannot_remove_ex) {
481                 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex);
482             }
483             catch(Throwable JavaDoc t) {
484                 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t);
485             }
486         }
487         for(int i=0; i < notifs.size(); i++)
488             notifs.elementAt(i).contentsCleared();
489     }
490
491
492     public V _remove(Object JavaDoc key) {
493         V retval=super.remove(key);
494         if(persistent) {
495             try {
496                 persistence_mgr.remove((Serializable)key);
497             }
498             catch(CannotRemoveException cannot_remove_ex) {
499                 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + cannot_remove_ex);
500             }
501             catch(Throwable JavaDoc t) {
502                 if(log.isErrorEnabled()) log.error("failed clearing contents, exception=" + t);
503             }
504         }
505         for(int i=0; i < notifs.size(); i++)
506             notifs.elementAt(i).entryRemoved(key);
507
508         return retval;
509     }
510
511     /*----------------------------------------------------------*/
512
513     /*-------------------- State Exchange ----------------------*/
514
515     public void receive(Message msg) {
516     }
517
518     public byte[] getState() {
519         K key;
520         V val;
521         Map JavaDoc<K,V> copy=new HashMap JavaDoc<K,V>();
522
523         for(Map.Entry JavaDoc<K,V> entry: entrySet()) {
524             key=entry.getKey();
525             val=entry.getValue();
526             copy.put(key, val);
527         }
528         try {
529             return Util.objectToByteBuffer(copy);
530         }
531         catch(Throwable JavaDoc ex) {
532             if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex);
533             return null;
534         }
535     }
536
537
538     public void setState(byte[] new_state) {
539         HashMap JavaDoc<K,V> new_copy;
540
541         try {
542             new_copy=(HashMap JavaDoc<K,V>)Util.objectFromByteBuffer(new_state);
543             if(new_copy == null)
544                 return;
545         }
546         catch(Throwable JavaDoc ex) {
547             if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex);
548             return;
549         }
550         _putAll(new_copy);
551         state_promise.setResult(Boolean.TRUE);
552     }
553
554     /*------------------- Membership Changes ----------------------*/
555
556     public void viewAccepted(View new_view) {
557         Vector JavaDoc<Address> new_mbrs=new_view.getMembers();
558
559         if(new_mbrs != null) {
560             sendViewChangeNotifications(new_view, new_mbrs, new Vector JavaDoc<Address>(members)); // notifies observers (joined, left)
561
members.clear();
562             members.addAll(new_mbrs);
563         }
564         //if size is bigger than one, there are more peers in the group
565
//otherwise there is only one server.
566
send_message=members.size() > 1;
567     }
568
569
570     /**
571      * Called when a member is suspected
572      */

573     public void suspect(Address suspected_mbr) {
574         ;
575     }
576
577
578     /**
579      * Block sending and receiving of messages until ViewAccepted is called
580      */

581     public void block() {
582     }
583
584
585     void sendViewChangeNotifications(View view, Vector JavaDoc<Address> new_mbrs, Vector JavaDoc<Address> old_mbrs) {
586         Vector JavaDoc<Address> joined, left;
587         Notification n;
588
589         if((notifs.isEmpty()) || (old_mbrs == null) || (new_mbrs == null) ||
590                 (old_mbrs.isEmpty()) || (new_mbrs.isEmpty()))
591             return;
592
593         // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
594
joined=new Vector JavaDoc<Address>();
595         for(Address mbr: new_mbrs) {
596             if(!old_mbrs.contains(mbr))
597                 joined.addElement(mbr);
598         }
599
600         // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
601
left=new Vector JavaDoc<Address>();
602         for(Address mbr: old_mbrs) {
603             if(!new_mbrs.contains(mbr)) {
604                 left.addElement(mbr);
605             }
606         }
607
608         for(int i=0; i < notifs.size(); i++) {
609             n=notifs.elementAt(i);
610             n.viewChange(view, joined, left);
611         }
612     }
613
614
615
616
617     public byte[] getState(String JavaDoc state_id) {
618         // not implemented
619
return null;
620     }
621
622     public void getState(OutputStream ostream) {
623         K key;
624         V val;
625         HashMap JavaDoc<K,V> copy=new HashMap JavaDoc<K,V>();
626         ObjectOutputStream oos=null;
627
628         for(Map.Entry JavaDoc<K,V> entry: entrySet()) {
629             key=entry.getKey();
630             val=entry.getValue();
631             copy.put(key, val);
632         }
633         try {
634             oos=new ObjectOutputStream(ostream);
635             oos.writeObject(copy);
636         }
637         catch(Throwable JavaDoc ex) {
638             if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex);
639         }
640         finally {
641             Util.close(oos);
642         }
643     }
644
645     public void getState(String JavaDoc state_id, OutputStream ostream) {
646     }
647
648     public void setState(String JavaDoc state_id, byte[] state) {
649     }
650
651     public void setState(InputStream istream) {
652         HashMap JavaDoc<K,V> new_copy=null;
653         ObjectInputStream ois=null;
654         try {
655             ois=new ObjectInputStream(istream);
656             new_copy=(HashMap JavaDoc<K,V>)ois.readObject();
657             ois.close();
658         }
659         catch(Throwable JavaDoc e) {
660             e.printStackTrace();
661             if(log.isErrorEnabled()) log.error("exception marshalling state: " + e);
662         }
663         finally {
664             Util.close(ois);
665         }
666         if(new_copy != null)
667             _putAll(new_copy);
668
669         state_promise.setResult(Boolean.TRUE);
670     }
671
672     public void setState(String JavaDoc state_id, InputStream istream) {
673     }
674
675     public void unblock() {
676     }
677
678 }
Popular Tags