1 4 package org.jboss.cache.interceptors; 5 6 import org.jboss.cache.CacheSPI; 7 import org.jboss.cache.buddyreplication.BuddyManager; 8 import org.jboss.cache.marshall.MethodCall; 9 import org.jboss.cache.marshall.MethodCallFactory; 10 import org.jboss.cache.marshall.MethodDeclarations; 11 import org.jgroups.Address; 12 13 import java.util.Iterator ; 14 import java.util.List ; 15 16 21 public abstract class BaseRpcInterceptor extends Interceptor 22 { 23 24 private BuddyManager buddyManager; 25 private boolean usingBuddyReplication; 26 27 public void setCache(CacheSPI cache) 28 { 29 super.setCache(cache); 30 buddyManager = cache.getBuddyManager(); 31 usingBuddyReplication = buddyManager != null; 32 } 33 34 41 protected void checkResponses(List rsps) throws Throwable 42 { 43 Object rsp; 44 if (rsps != null) 45 { 46 for (Iterator it = rsps.iterator(); it.hasNext();) 47 { 48 rsp = it.next(); 49 if (rsp != null && rsp instanceof Throwable ) 50 { 51 if (log.isDebugEnabled()) 53 log.debug("Received Throwable from remote node", (Throwable ) rsp); 54 throw (Throwable ) rsp; 55 } 56 } 57 } 58 } 59 60 protected void replicateCall(MethodCall call, boolean sync) throws Throwable 61 { 62 replicateCall(null, call, sync); 63 } 64 65 protected void replicateCall(List <Address> recipients, MethodCall call, boolean sync) throws Throwable 66 { 67 68 if (log.isTraceEnabled()) log.trace("Broadcasting call " + call + " to recipient list " + recipients); 69 70 if (!sync && cache.getRPCManager().getReplicationQueue() != null && !usingBuddyReplication) 71 { 72 putCallOnAsyncReplicationQueue(call); 73 } 74 else 75 { 76 if (usingBuddyReplication) call = buddyManager.transformFqns(call); 77 78 List <Address> callRecipients = recipients; 79 if (callRecipients == null) 80 { 81 callRecipients = usingBuddyReplication ? buddyManager.getBuddyAddresses() : cache.getMembers(); 82 } 83 84 List rsps = cache.getRPCManager().callRemoteMethods(callRecipients, 85 MethodDeclarations.replicateMethod, 86 new Object []{call}, 87 sync, true, configuration.getSyncReplTimeout()); 90 if (log.isTraceEnabled()) 91 { 92 log.trace("responses=" + rsps); 93 } 94 if (sync) checkResponses(rsps); 95 } 96 97 } 98 99 protected void putCallOnAsyncReplicationQueue(MethodCall call) 100 { 101 if (log.isDebugEnabled()) log.debug("Putting call " + call + " on the replication queue."); 102 cache.getRPCManager().getReplicationQueue().add(MethodCallFactory.create(MethodDeclarations.replicateMethod, call)); 103 } 104 105 protected boolean containsModifications(MethodCall m) 106 { 107 switch (m.getMethodId()) 108 { 109 case MethodDeclarations.prepareMethod_id: 110 case MethodDeclarations.optimisticPrepareMethod_id: 111 List mods = (List ) m.getArgs()[1]; 112 return mods.size() > 0; 113 case MethodDeclarations.commitMethod_id: 114 case MethodDeclarations.rollbackMethod_id: 115 return cache.getInvocationContext().isTxHasMods(); 116 default: 117 return false; 118 } 119 } 120 } | Popular Tags |