1 3 package org.jgroups.blocks; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.*; 8 import org.jgroups.util.RspList; 9 10 import java.io.Serializable ; 11 12 13 14 15 16 28 public class ReplicationManager implements RequestHandler { 29 Address local_addr=null; 30 ReplicationReceiver receiver=null; 31 32 33 protected MessageDispatcher disp=null; 34 35 protected final Log log=LogFactory.getLog(this.getClass()); 36 37 38 39 42 public ReplicationManager(Channel channel, 43 MessageListener ml, 44 MembershipListener l, 45 ReplicationReceiver receiver) { 46 setReplicationReceiver(receiver); 47 if(channel != null) { 48 local_addr=channel.getLocalAddress(); 49 disp=new MessageDispatcher(channel, 50 ml, 51 l, 52 this, true); } 55 } 56 57 58 61 public ReplicationManager(PullPushAdapter adapter, 62 Serializable id, 63 MessageListener ml, 64 MembershipListener l, 65 ReplicationReceiver receiver) { 66 if(adapter != null && adapter.getTransport() != null && adapter.getTransport() instanceof Channel) 67 local_addr=((Channel)adapter.getTransport()).getLocalAddress(); 68 setReplicationReceiver(receiver); 69 disp=new MessageDispatcher(adapter, 70 id, ml, 72 l, 73 this); disp.setDeadlockDetection(true); 75 } 76 77 78 79 public void stop() { 80 if(disp != null) 81 disp.stop(); 82 } 83 84 85 86 92 public Xid begin() throws Exception { 93 return begin(Xid.DIRTY_READS); 94 } 95 96 97 105 public Xid begin(int transaction_mode) throws Exception { 106 return Xid.create(local_addr, transaction_mode); 107 } 108 109 110 public void setReplicationReceiver(ReplicationReceiver handler) { 111 this.receiver=handler; 112 } 113 114 public void setMembershipListener(MembershipListener l) { 115 if(l == null) 116 return; 117 if(disp == null) 118 if(log.isErrorEnabled()) log.error("dispatcher is null, cannot set MembershipListener"); 119 else 120 disp.setMembershipListener(l); 121 } 122 123 124 190 public RspList send(Address dest, 191 byte[] data, 192 boolean synchronous, 193 long synchronous_timeout, 194 Xid transaction, 195 byte[] lock_info, 196 long lock_acquisition_timeout, 197 long lock_lease_timeout, 198 boolean use_locks) { 200 Message msg=null; 201 ReplicationData d=new ReplicationData(ReplicationData.SEND, 202 data, 203 transaction, 204 lock_info, 205 lock_acquisition_timeout, 206 lock_lease_timeout, 207 use_locks); 208 209 if(log.isInfoEnabled()) log.info("data is " + d + " (synchronous=" + synchronous + ')'); 210 msg=new Message(dest, null, d); 211 if(synchronous) { 212 return disp.castMessage(null, msg, GroupRequest.GET_ALL, synchronous_timeout); 213 } 214 else { 215 disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); 216 return null; 217 } 218 } 219 220 221 226 public void commit(Xid transaction) { 227 sendMessage(ReplicationData.COMMIT, transaction); 228 } 229 230 231 235 public void rollback(Xid transaction) { 236 sendMessage(ReplicationData.ROLLBACK, transaction); 237 } 238 239 240 241 242 public Object handle(Message msg) { 243 Object retval=null; 244 ReplicationData data; 245 246 if(msg == null) { 247 if(log.isErrorEnabled()) log.error("received message was null"); 248 return null; 249 } 250 251 if(msg.getLength() == 0) { 252 if(log.isErrorEnabled()) log.error("payload of received message was null"); 253 return null; 254 } 255 256 try { 257 data=(ReplicationData)msg.getObject(); 258 } 259 catch(Throwable ex) { 260 if(log.isErrorEnabled()) log.error("failure unmarshalling message: " + ex); 261 return null; 262 } 263 264 switch(data.getType()) { 265 case ReplicationData.SEND: 266 try { 267 return handleSend(data); 268 } 269 catch(Throwable ex) { 270 if(log.isErrorEnabled()) log.error("failed handling update: " + ex); 271 return ex; 272 } 273 case ReplicationData.COMMIT: 274 handleCommit(data.getTransaction()); 275 break; 276 case ReplicationData.ROLLBACK: 277 handleRollback(data.getTransaction()); 278 break; 279 default: 280 if(log.isErrorEnabled()) log.error("received incorrect replication message: " + data); 281 return null; 282 } 283 284 return retval; 285 } 286 287 288 289 290 protected Object handleSend(ReplicationData data) throws UpdateException, LockingException { 291 try { 292 if(receiver == null) { 293 if(log.isWarnEnabled()) log.warn("receiver is not set"); 294 return null; 295 } 296 return receiver.receive(data.getTransaction(), 297 data.getData(), 298 data.getLockInfo(), 299 data.getLockAcquisitionTimeout(), 300 data.getLockLeaseTimeout(), 301 data.useLocks()); 302 } 303 catch(Throwable ex) { 304 return ex; 305 } 306 } 307 308 309 protected void handleCommit(Xid transaction) { 310 if(receiver == null) { 311 if(log.isWarnEnabled()) log.warn("receiver is not set"); 312 } 313 else 314 receiver.commit(transaction); 315 } 316 317 protected void handleRollback(Xid transaction) { 318 if(receiver == null) { 319 if(log.isWarnEnabled()) log.warn("receiver is not set"); 320 } 321 else 322 receiver.rollback(transaction); 323 } 324 325 326 327 328 329 330 331 void sendMessage(int type, Xid transaction) { 332 ReplicationData data=new ReplicationData(type, null, transaction, null, 0, 0, false); 333 Message msg=new Message(null, null, data); 334 disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); } 336 337 338 339 340 } 341 | Popular Tags |