KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > ReplicatedHashtable


1 // $Id: ReplicatedHashtable.java,v 1.9 2005/02/19 13:23:34 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.*;
8 import org.jgroups.util.Util;
9
10 import java.io.Serializable JavaDoc;
11 import java.util.*;
12
13 /**
14  * Provides the abstraction of a java.util.Hashtable that is replicated at several
15  * locations. Any change to the hashtable (clear, put, remove etc) will transparently be
16  * propagated to all replicas in the group. All read-only methods will always access the
17  * local replica.<p>
18  * Both keys and values added to the hashtable <em>must be serializable</em>, the reason
19  * being that they will be sent across the network to all replicas of the group. Having said
20  * this, it is now for example possible to add RMI remote objects to the hashtable as they
21  * are derived from <code>java.rmi.server.RemoteObject</code> which in turn is serializable.
22  * This allows to lookup shared distributed objects by their name and invoke methods on them,
23  * regardless of one's onw location. A <code>ReplicatedHashtable</code> thus allows to
24  * implement a distributed naming service in just a couple of lines.<p>
25  * An instance of this class will contact an existing member of the group to fetch its
26  * initial state.<p>
27  * Contrary to DistributedHashtable, this class does not make use of RpcDispatcher (and RequestCorrelator)
28  * but uses plain asynchronous messages instead.
29  * @author Bela Ban
30  * @author <a HREF="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a>
31  * todo: implement putAll() [similar to DistributedHashtable]
32  * @deprecated This class is unsupported; use JBossCache instead: http://www.jboss.com/products/jbosscache
33  */

34 public class ReplicatedHashtable extends Hashtable implements MessageListener, MembershipListener {
35
36     public interface Notification {
37         void entrySet(Object JavaDoc key, Object JavaDoc value);
38
39         void entryRemoved(Object JavaDoc key);
40
41         void viewChange(Vector new_mbrs, Vector old_mbrs);
42
43         void contentsSet(Map new_entries);
44     }
45
46     public interface StateTransferListener {
47         void stateTransferStarted();
48
49         void stateTransferCompleted(boolean success);
50     }
51
52     transient Channel channel;
53     transient PullPushAdapter adapter=null;
54     final transient Vector notifs=new Vector();
55     // to be notified when mbrship changes
56
final transient Vector members=new Vector(); // keeps track of all DHTs
57
final transient List state_transfer_listeners=new ArrayList();
58     transient boolean state_transfer_running=false;
59
60     /** Determines when the updates have to be sent across the network, avoids sending unnecessary
61      * messages when there are no member in the group */

62     private transient boolean send_message=false;
63
64     protected final transient Log log=LogFactory.getLog(this.getClass());
65
66     /**
67      * Creates a ReplicatedHashtable
68      * @param groupname The name of the group to join
69      * @param factory The ChannelFactory which will be used to create a channel
70      * @param properties The property string to be used to define the channel
71      * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.
72      */

73     public ReplicatedHashtable(String JavaDoc groupname, ChannelFactory factory, StateTransferListener l, String JavaDoc properties, long state_timeout) {
74         if(l != null)
75             addStateTransferListener(l);
76         try {
77             channel=factory != null ? factory.createChannel(properties) : new JChannel(properties);
78             channel.connect(groupname);
79             adapter=new PullPushAdapter(channel, this, this);
80             adapter.setListener(this);
81             channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
82             boolean rc=channel.getState(null, state_timeout);
83             if(rc)
84                 if(log.isInfoEnabled()) log.info("state was retrieved successfully");
85             else
86                 if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)");
87         }
88         catch(Exception JavaDoc e) {
89             if(log.isErrorEnabled()) log.error("exception=" + e);
90         }
91     }
92
93     void getInitState(Channel channel, long state_timeout) throws Exception JavaDoc {
94         try {
95             notifyStateTransferStarted();
96             boolean rc=channel.getState(null, state_timeout);
97             if(rc)
98                 if(log.isInfoEnabled()) log.info("state was retrieved successfully");
99             else {
100                 if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)");
101                 notifyStateTransferCompleted(false);
102             }
103         }
104         catch(Exception JavaDoc ex) {
105             notifyStateTransferCompleted(false);
106             throw ex;
107         }
108     }
109
110     public ReplicatedHashtable(String JavaDoc groupname, ChannelFactory factory, String JavaDoc properties, long state_timeout) {
111         this(groupname, factory, null, properties, state_timeout);
112     }
113
114     public ReplicatedHashtable(JChannel channel, long state_timeout) throws ChannelClosedException, ChannelNotConnectedException {
115         this(channel, null, state_timeout);
116     }
117
118     public ReplicatedHashtable(JChannel channel, StateTransferListener l, long state_timeout) throws ChannelClosedException, ChannelNotConnectedException {
119         this.channel=channel;
120         this.adapter=new PullPushAdapter(channel, this, this);
121         this.adapter.setListener(this);
122         if(l != null)
123             addStateTransferListener(l);
124         this.channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
125         boolean rc=channel.getState(null, state_timeout);
126         if(rc)
127             if(log.isInfoEnabled()) log.info("state was retrieved successfully");
128         else
129             if(log.isInfoEnabled()) log.info("state could not be retrieved (first member)");
130     }
131
132     public boolean stateTransferRunning() {
133         return state_transfer_running;
134     }
135
136     public Address getLocalAddress() {
137         return channel != null ? channel.getLocalAddress() : null;
138     }
139
140     public Channel getChannel() {
141         return channel;
142     }
143
144     public void addNotifier(Notification n) {
145         if(!notifs.contains(n))
146             notifs.addElement(n);
147     }
148
149     public void addStateTransferListener(StateTransferListener l) {
150         if(l != null && !(state_transfer_listeners.contains(l)))
151             state_transfer_listeners.add(l);
152     }
153
154     public void removeStateTransferListener(StateTransferListener l) {
155         if(l != null)
156             state_transfer_listeners.remove(l);
157     }
158
159     /**
160      * Maps the specified key to the specified value in the hashtable. Neither of both parameters can be null
161      * @param key - the hashtable key
162      * @param value - the value
163      * @return the previous value of the specified key in this hashtable, or null if it did not have one
164      */

165     public Object JavaDoc put(Object JavaDoc key, Object JavaDoc value) {
166         Message msg;
167         Object JavaDoc prev_val=null;
168         prev_val=get(key);
169
170         //Changes done by <aos>
171
//if true, send message to the group
172
if(send_message == true) {
173             try {
174                 msg=new Message(null, null, new Request(Request.PUT, key, value));
175                 channel.send(msg);
176                 //return prev_val;
177
}
178             catch(Exception JavaDoc e) {
179                 //return null;
180
}
181         }
182         else {
183             super.put(key, value);
184             //don't have to do prev_val = super.put(..) as is done at the beginning
185
}
186         return prev_val;
187     }
188
189     /**
190      * Copies all of the mappings from the specified Map to this Hashtable These mappings will replace any mappings that this Hashtable had for any of the keys currently in the specified Map.
191      * @param m - Mappings to be stored in this map
192      */

193     public void putAll(Map m) {
194         Message msg;
195         //Changes done by <aos>
196
//if true, send message to the group
197
if(send_message == true) {
198             try {
199                 msg=new Message(null, null, new Request(Request.PUT_ALL, null, m));
200                 channel.send(msg);
201             }
202             catch(Exception JavaDoc e) {
203                 if(log.isErrorEnabled()) log.error("exception=" + e);
204             }
205         }
206         else {
207             super.putAll(m);
208         }
209     }
210
211     /**
212      * Clears this hashtable so that it contains no keys
213      */

214     public void clear() {
215         Message msg;
216         //Changes done by <aos>
217
//if true, send message to the group
218
if(send_message == true) {
219             try {
220                 msg=new Message(null, null, new Request(Request.CLEAR, null, null));
221                 channel.send(msg);
222             }
223             catch(Exception JavaDoc e) {
224                 if(log.isErrorEnabled()) log.error("exception=" + e);
225             }
226         }
227         else {
228             super.clear();
229         }
230     }
231
232     /**
233      * Removes the key (and its corresponding value) from the Hashtable.
234      * @param key - the key to be removed.
235      * @return the value to which the key had been mapped in this hashtable, or null if the key did not have a mapping.
236      */

237     public Object JavaDoc remove(Object JavaDoc key) {
238         Message msg;
239         Object JavaDoc retval=null;
240         retval=get(key);
241
242         //Changes done by <aos>
243
//if true, propagate action to the group
244
if(send_message == true) {
245             try {
246                 msg=new Message(null, null, new Request(Request.REMOVE, key, null));
247                 channel.send(msg);
248                 //return retval;
249
}
250             catch(Exception JavaDoc e) {
251                 //return null;
252
}
253         }
254         else {
255             super.remove(key);
256             //don't have to do retval = super.remove(..) as is done at the beginning
257
}
258         return retval;
259     }
260
261     /*------------------------ Callbacks -----------------------*/
262     Object JavaDoc _put(Object JavaDoc key, Object JavaDoc value) {
263         Object JavaDoc retval=super.put(key, value);
264         for(int i=0; i < notifs.size(); i++)
265             ((Notification)notifs.elementAt(i)).entrySet(key, value);
266         return retval;
267     }
268
269     void _clear() {
270         super.clear();
271     }
272
273     Object JavaDoc _remove(Object JavaDoc key) {
274         Object JavaDoc retval=super.remove(key);
275         for(int i=0; i < notifs.size(); i++)
276             ((Notification)notifs.elementAt(i)).entryRemoved(key);
277         return retval;
278     }
279
280     /**
281      * @see java.util.Map#putAll(java.util.Map)
282      */

283     public void _putAll(Map m) {
284         if(m == null)
285             return;
286         //######## The same way as in the DistributedHashtable
287
// Calling the method below seems okay, but would result in ... deadlock !
288
// The reason is that Map.putAll() calls put(), which we override, which results in
289
// lock contention for the map.
290
// ---> super.putAll(m); <--- CULPRIT !!!@#$%$
291
// That said let's do it the stupid way:
292
//######## The same way as in the DistributedHashtable
293
Map.Entry entry;
294         for(Iterator it=m.entrySet().iterator(); it.hasNext();) {
295             entry=(Map.Entry)it.next();
296             super.put(entry.getKey(), entry.getValue());
297         }
298
299         for(int i=0; i < notifs.size(); i++)
300             ((Notification)notifs.elementAt(i)).contentsSet(m);
301     }
302     /*----------------------------------------------------------*/
303
304     /*-------------------- MessageListener ----------------------*/
305
306     public void receive(Message msg) {
307         Request req=null;
308
309         if(msg == null)
310             return;
311         req=(Request)msg.getObject();
312         if(req == null)
313             return;
314         switch(req.req_type) {
315             case Request.PUT:
316                 if(req.key != null && req.val != null)
317                     _put(req.key, req.val);
318                 break;
319             case Request.REMOVE:
320                 if(req.key != null)
321                     _remove(req.key);
322                 break;
323             case Request.CLEAR:
324                 _clear();
325                 break;
326
327             case Request.PUT_ALL:
328                 if(req.val != null)
329                     _putAll((Map)req.val);
330                 break;
331             default :
332                 // error
333
}
334     }
335
336     public byte[] getState() {
337         Object JavaDoc key, val;
338         Hashtable copy=new Hashtable();
339
340         for(Enumeration e=keys(); e.hasMoreElements();) {
341             key=e.nextElement();
342             val=get(key);
343             copy.put(key, val);
344         }
345         try {
346             return Util.objectToByteBuffer(copy);
347         }
348         catch(Exception JavaDoc ex) {
349             if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex);
350             return null;
351         }
352     }
353
354     public void setState(byte[] new_state) {
355         Hashtable new_copy;
356         Object JavaDoc key;
357
358         try {
359             new_copy=(Hashtable)Util.objectFromByteBuffer(new_state);
360             if(new_copy == null) {
361                 notifyStateTransferCompleted(true);
362                 return;
363             }
364         }
365         catch(Throwable JavaDoc ex) {
366             if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex);
367             notifyStateTransferCompleted(false);
368             return;
369         }
370
371         _clear(); // remove all elements
372
for(Enumeration e=new_copy.keys(); e.hasMoreElements();) {
373             key=e.nextElement();
374             _put(key, new_copy.get(key));
375         }
376         notifyStateTransferCompleted(true);
377     }
378
379     /*-------------------- End of MessageListener ----------------------*/
380
381     /*----------------------- MembershipListener ------------------------*/
382
383     public void viewAccepted(View new_view) {
384         Vector new_mbrs=new_view.getMembers();
385
386         if(new_mbrs != null) {
387             sendViewChangeNotifications(new_mbrs, members);
388             // notifies observers (joined, left)
389
members.removeAllElements();
390             for(int i=0; i < new_mbrs.size(); i++)
391                 members.addElement(new_mbrs.elementAt(i));
392         }
393         //if size is bigger than one, there are more peers in the group
394
//otherwise there is only one server.
395
if(members.size() > 1) {
396             send_message=true;
397         }
398         else {
399             send_message=false;
400         }
401     }
402
403     /** Called when a member is suspected */
404     public void suspect(Address suspected_mbr) {
405         ;
406     }
407
408     /** Block sending and receiving of messages until ViewAccepted is called */
409     public void block() {
410     }
411
412     /*------------------- End of MembershipListener ----------------------*/
413
414     void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) {
415         Vector joined, left;
416         Object JavaDoc mbr;
417         Notification n;
418
419         if(notifs.size() == 0 || old_mbrs == null || new_mbrs == null || old_mbrs.size() == 0 || new_mbrs.size() == 0)
420             return;
421
422         // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
423
joined=new Vector();
424         for(int i=0; i < new_mbrs.size(); i++) {
425             mbr=new_mbrs.elementAt(i);
426             if(!old_mbrs.contains(mbr))
427                 joined.addElement(mbr);
428         }
429
430         // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
431
left=new Vector();
432         for(int i=0; i < old_mbrs.size(); i++) {
433             mbr=old_mbrs.elementAt(i);
434             if(!new_mbrs.contains(mbr)) {
435                 left.addElement(mbr);
436             }
437         }
438
439         for(int i=0; i < notifs.size(); i++) {
440             n=(Notification)notifs.elementAt(i);
441             n.viewChange(joined, left);
442         }
443     }
444
445     void notifyStateTransferStarted() {
446         state_transfer_running=true;
447         for(Iterator it=state_transfer_listeners.iterator(); it.hasNext();) {
448             StateTransferListener listener=(StateTransferListener)it.next();
449             try {
450                 listener.stateTransferStarted();
451             }
452             catch(Throwable JavaDoc t) {
453             }
454         }
455     }
456
457     void notifyStateTransferCompleted(boolean success) {
458         state_transfer_running=false;
459         for(Iterator it=state_transfer_listeners.iterator(); it.hasNext();) {
460             StateTransferListener listener=(StateTransferListener)it.next();
461             try {
462                 listener.stateTransferCompleted(success);
463             }
464             catch(Throwable JavaDoc t) {
465             }
466         }
467     }
468
469     private static class Request implements Serializable JavaDoc {
470         static final int PUT=1;
471         static final int REMOVE=2;
472         static final int CLEAR=3;
473         static final int PUT_ALL=4;
474
475         int req_type=0;
476         Object JavaDoc key=null;
477         Object JavaDoc val=null;
478
479         Request(int req_type, Object JavaDoc key, Object JavaDoc val) {
480             this.req_type=req_type;
481             this.key=key;
482             this.val=val;
483         }
484
485         public String JavaDoc toString() {
486             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
487             sb.append(type2String(req_type));
488             if(key != null)
489                 sb.append("\nkey=" + key);
490             if(val != null)
491                 sb.append("\nval=" + val);
492             return sb.toString();
493         }
494
495         String JavaDoc type2String(int t) {
496             switch(t) {
497                 case PUT:
498                     return "PUT";
499                 case REMOVE:
500                     return "REMOVE";
501                 case CLEAR:
502                     return "CLEAR";
503                 case PUT_ALL:
504                     return "PUT_ALL";
505                 default :
506                     return "<unknown>";
507             }
508         }
509
510     }
511 }
512
Popular Tags