KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > ReplicationManager


1 // $Id: ReplicationManager.java,v 1.7 2004/09/23 16:29:11 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.*;
8 import org.jgroups.util.RspList;
9
10 import java.io.Serializable JavaDoc;
11
12
13
14
15
16 /**
17  * Class to propagate updates to a number of nodes in various ways:
18  * <ol>
19  * <li>Asynchronous
20  * <li>Synchronous
21  * <li>Synchronous with locking
22  * </ol>
23  *
24  * <br/><em>Note: This class is experimental as of Oct 2002</em>
25  *
26  * @author Bela Ban Oct 2002
27  */

28 public class ReplicationManager implements RequestHandler {
29     Address local_addr=null;
30     ReplicationReceiver receiver=null;
31
32     /** Used to broadcast updates and receive responses (latter only in synchronous case) */
33     protected MessageDispatcher disp=null;
34
35     protected final Log log=LogFactory.getLog(this.getClass());
36
37
38
39     /**
40      * Creates an instance of ReplicationManager on top of a Channel
41      */

42     public ReplicationManager(Channel channel,
43                               MessageListener ml,
44                               MembershipListener l,
45                               ReplicationReceiver receiver) {
46         setReplicationReceiver(receiver);
47     if(channel != null) {
48             local_addr=channel.getLocalAddress();
49             disp=new MessageDispatcher(channel,
50                                        ml,
51                                        l,
52                                        this, // ReplicationManager is RequestHandler
53
true); // use deadlock detection
54
}
55     }
56     
57
58     /**
59      * Creates an instance of ReplicationManager on top of a PullPushAdapter
60      */

61     public ReplicationManager(PullPushAdapter adapter,
62                               Serializable JavaDoc id,
63                               MessageListener ml,
64                               MembershipListener l,
65                               ReplicationReceiver receiver) {
66         if(adapter != null && adapter.getTransport() != null && adapter.getTransport() instanceof Channel)
67             local_addr=((Channel)adapter.getTransport()).getLocalAddress();
68         setReplicationReceiver(receiver);
69         disp=new MessageDispatcher(adapter,
70                                    id, // FIXME
71
ml,
72                                    l,
73                                    this); // ReplicationManager is RequestHandler
74
disp.setDeadlockDetection(true);
75     }
76     
77
78
79     public void stop() {
80         if(disp != null)
81             disp.stop();
82     }
83     
84
85
86     /**
87      * Create a new transaction. The transaction will be used to send updates, identify updates in the same transaction,
88      * and eventually commit or rollback the changes associated with the transaction.
89      * @return Xid A unique transaction
90      * @exception Exception Thrown when local_addr is null
91      */

92     public Xid begin() throws Exception JavaDoc {
93         return begin(Xid.DIRTY_READS);
94     }
95
96
97     /**
98      * Create a new transaction. The tracsion will be used to send updates, identify updates in the same transaction,
99      * and eventually commit or rollback the changes associated with the transaction.
100      * @param transaction_mode Mode in which the transaction should run. Possible values are Xid.DIRTY_READS,
101      * Xid.READ_COMMITTED, Xid.REPEATABLE_READ and Xid.SERIALIZABLE
102      * @return Xid A unique transaction
103      * @exception Exception Thrown when local_addr is null
104      */

105     public Xid begin(int transaction_mode) throws Exception JavaDoc {
106         return Xid.create(local_addr, transaction_mode);
107     }
108
109     
110     public void setReplicationReceiver(ReplicationReceiver handler) {
111         this.receiver=handler;
112     }
113
114     public void setMembershipListener(MembershipListener l) {
115         if(l == null)
116             return;
117         if(disp == null)
118             if(log.isErrorEnabled()) log.error("dispatcher is null, cannot set MembershipListener");
119         else
120             disp.setMembershipListener(l);
121     }
122     
123
124     /**
125      * Sends a request to all members of the group. Sending is asynchronous (return immediately) or
126      * synchronous (wait for all members to respond). If <code>use_locking</code> is true, then locking
127      * will be used at the receivers to acquire locks before accessing/updating a resource. Locks can be
128      * explicitly set using <code>lock_info</code> or implicitly through <code>data</code>. In the latter
129      * case, locks are induced from the data sent, e.g. if the data is a request for updating a certain row
130      * in a table, then we need to acquire a lock for that table.<p>
131      * In case of using locks, if the transaction associated with update already has a lock for a given resource,
132      * we will return. Otherwise, we will wait for <code>lock_acquisition_timeout</code> milliseconds. If the lock
133      * is not granted within that time a <code>LockingException</code> will be thrown. (<em>We hope to
134      * replace this timeout with a distributed deadlock detection algorithm in the future.</em>)<p>
135      * We have 3 main use case for this method:
136      * <ol>
137      * <li><b>Asynchronous</b>: sends the message and returns immediately. Argument <code>asynchronous</code>
138      * needs to be true. All other arguments except <code>data</code> are ignored and can be null. Will call
139      * <code>update()</code> on the registered ReplicationReceiver at each receiver.
140      * <li><b>Synchronous without locks</b>: sends the message, but returns only after responses from all members
141      * have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes
142      * first). Argument <code>asynchronous</code> needs to be false. Argument <code>synchronous_timeout</code>
143      * needs to be >= 0. If it is null the call will not time out, but wait for all responses.
144      * All other arguments (besides <code>data</code> are ignored).
145      * <li><b>Synchronous with locks</b>: sends the message, but returns only after responses from all members
146      * have been received, or <code>synchronous_timeout</code> milliseconds have elapsed (whichever comes
147      * first). At the receiver's side we have to acquire a lock for the resource to be updated, if the
148      * acquisition fails a LockingException will be thrown. The resource to be locked can be found in two ways:
149      * either <code>data</code> contains the resource(c) to be acquired implicitly, or <code>lock_info</code>
150      * lists the resources explicitly, or both. All the locks acquired at the receiver's side should be associated
151      * with <code>transaction</code>. When a <code>commit()</code> is received, the receiver should commit
152      * the modifications to the resource and release all locks. When a <code>rollback()</code> is received,
153      * the receiver should remove all (temporary) modifications and release all locks associated with
154      * <code>transaction</code>.
155      * </ol>
156      * In both the synchronous cases a List of byte[] will be returned if the data was sent to all receivers
157      * successfully, cointaining byte buffers. The list may be empty.
158      * @param dest The destination to which to send the message. Will be sent to all members if null.
159      * @param data The data to be sent to all members. It may contain information about the resource to be locked.
160      * @param synchronous If false the call is asynchronous, ie. non-blocking. If true, the method will wait
161      * until responses from all members have been received (unless a timeout is defined, see below)
162      * @param synchronous_timeout In a synchronous call, we will wait for responses from all members or until
163      * <code>synchronous_timeout</code> have elapsed (whichever comes first). 0 means
164      * to wait forever.
165      * @param transaction The transaction under which all locks for resources should be acquired. The receiver
166      * will probably maintain a lock table with resources as keys and transactions as values.
167      * When an update is received, the receiver checks its lock table: if the resource is
168      * not yet taken, the resource/transaction pair will be added to the lock table. Otherwise,
169      * we check if the transaction's owner associated with the resource is the same as the caller.
170      * If this is the case, the lock will be considered granted, otherwise we will wait for the
171      * resource to become available (for a certain amount of time). When a transaction is
172      * committed or rolled back, all resources associated with this transaction will be released.
173      * @param lock_info Information about resource(s) to be acquired. This may be null, e.g. if this information
174      * is already implied in <code>data</code>. Both <code>data</code> and <code>lock_info</code>
175      * may be used to define the set of resources to be acquired.
176      * @param lock_acquisition_timeout The number of milliseconds to wait until a lock acquisition request is
177      * considered failed (causing a LockingException). If 0 we will wait forever.
178      * (Note that this may lead to deadlocks).
179      * @param lock_lease_timeout The number of milliseconds we want to keep the lock for a resource. After
180      * this time has elapsed, the lock will be released. If 0 we won't release the lock(s)
181      * @param use_locks If this is false, we will ignore all lock information (even if it is specified) and
182      * not use locks at all.
183      * @return RspList A list of Rsps ({@link org.jgroups.util.Rsp}), one for each member. Each one is the result of
184      * {@link ReplicationReceiver#receive}. If a member didn't send a response, the <code>received</code>
185      * field will be false. If the member was suspected while waiting for a response, the <code>
186      * suspected</code> field will be true. If the <code>receive()</code> method in the receiver returned
187      * a value it will be in field <code>retval</code>. If the receiver threw an exception it will also
188      * be in this field.
189      */

190     public RspList send(Address dest,
191                         byte[] data,
192                         boolean synchronous,
193                         long synchronous_timeout,
194                         Xid transaction,
195                         byte[] lock_info,
196                         long lock_acquisition_timeout,
197                         long lock_lease_timeout,
198                         boolean use_locks) { // throws UpdateException, TimeoutException, LockingException {
199

200         Message msg=null;
201         ReplicationData d=new ReplicationData(ReplicationData.SEND,
202                                               data,
203                                               transaction,
204                                               lock_info,
205                                               lock_acquisition_timeout,
206                                               lock_lease_timeout,
207                                               use_locks);
208
209             if(log.isInfoEnabled()) log.info("data is " + d + " (synchronous=" + synchronous + ')');
210         msg=new Message(dest, null, d);
211         if(synchronous) {
212             return disp.castMessage(null, msg, GroupRequest.GET_ALL, synchronous_timeout);
213         }
214         else {
215             disp.castMessage(null, msg, GroupRequest.GET_NONE, 0);
216             return null;
217         }
218     }
219
220     
221     /**
222      * Commits all modifications sent to the receivers via {@link #send} and releases all locks associated with
223      * this transaction. If modifications were made to stable storage (but not to resource), those modifications
224      * would now need to be transferred to the resource (e.g. database).
225      */

226     public void commit(Xid transaction) {
227         sendMessage(ReplicationData.COMMIT, transaction);
228     }
229
230
231     /**
232      * Discards all modifications sent to the receivers via {@link #send} and releases all locks associated with
233      * this transaction.
234      */

235     public void rollback(Xid transaction) {
236         sendMessage(ReplicationData.ROLLBACK, transaction);
237     }
238
239
240     /* ------------------------------- RequestHandler interface ------------------------------ */
241
242     public Object JavaDoc handle(Message msg) {
243         Object JavaDoc retval=null;
244         ReplicationData data;
245
246         if(msg == null) {
247             if(log.isErrorEnabled()) log.error("received message was null");
248             return null;
249         }
250
251         if(msg.getLength() == 0) {
252             if(log.isErrorEnabled()) log.error("payload of received message was null");
253             return null;
254         }
255         
256         try {
257             data=(ReplicationData)msg.getObject();
258         }
259         catch(Throwable JavaDoc ex) {
260             if(log.isErrorEnabled()) log.error("failure unmarshalling message: " + ex);
261             return null;
262         }
263
264         switch(data.getType()) {
265         case ReplicationData.SEND:
266             try {
267                 return handleSend(data);
268             }
269             catch(Throwable JavaDoc ex) {
270                 if(log.isErrorEnabled()) log.error("failed handling update: " + ex);
271                 return ex;
272             }
273         case ReplicationData.COMMIT:
274             handleCommit(data.getTransaction());
275             break;
276         case ReplicationData.ROLLBACK:
277             handleRollback(data.getTransaction());
278             break;
279         default:
280             if(log.isErrorEnabled()) log.error("received incorrect replication message: " + data);
281             return null;
282         }
283
284         return retval;
285     }
286     
287     /* --------------------------- End of RequestHandler interface---------------------------- */
288
289
290     protected Object JavaDoc handleSend(ReplicationData data) throws UpdateException, LockingException {
291         try {
292             if(receiver == null) {
293                 if(log.isWarnEnabled()) log.warn("receiver is not set");
294                 return null;
295             }
296             return receiver.receive(data.getTransaction(),
297                                     data.getData(),
298                                     data.getLockInfo(),
299                                     data.getLockAcquisitionTimeout(),
300                                     data.getLockLeaseTimeout(),
301                                     data.useLocks());
302         }
303         catch(Throwable JavaDoc ex) {
304             return ex;
305         }
306     }
307
308
309     protected void handleCommit(Xid transaction) {
310         if(receiver == null) {
311             if(log.isWarnEnabled()) log.warn("receiver is not set");
312         }
313         else
314             receiver.commit(transaction);
315     }
316
317     protected void handleRollback(Xid transaction) {
318         if(receiver == null) {
319             if(log.isWarnEnabled()) log.warn("receiver is not set");
320         }
321         else
322             receiver.rollback(transaction);
323     }
324
325
326
327
328     /* -------------------------------------- Private methods ------------------------------------ */
329
330     
331     void sendMessage(int type, Xid transaction) {
332         ReplicationData data=new ReplicationData(type, null, transaction, null, 0, 0, false);
333         Message msg=new Message(null, null, data);
334         disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // send commit message asynchronously
335
}
336
337
338     /* ---------------------------------- End of Private methods --------------------------------- */
339     
340 }
341
Popular Tags