1 22 package org.jboss.invocation.jrmp.interfaces; 23 24 import java.io.Externalizable ; 25 import java.io.IOException ; 26 import java.io.ObjectInput ; 27 import java.io.ObjectOutput ; 28 import java.rmi.MarshalledObject ; 29 import java.rmi.RemoteException ; 30 import java.rmi.ServerException ; 31 import java.util.ArrayList ; 32 import java.util.HashSet ; 33 import java.util.List ; 34 import java.util.WeakHashMap ; 35 import javax.transaction.TransactionRolledbackException ; 36 37 import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository; 38 import org.jboss.ha.framework.interfaces.FamilyClusterInfo; 39 import org.jboss.ha.framework.interfaces.GenericClusteringException; 40 import org.jboss.ha.framework.interfaces.HARMIResponse; 41 import org.jboss.ha.framework.interfaces.LoadBalancePolicy; 42 import org.jboss.invocation.Invocation; 43 import org.jboss.invocation.Invoker; 44 import org.jboss.invocation.InvokerInterceptor; 45 import org.jboss.invocation.InvokerProxyHA; 46 import org.jboss.invocation.MarshalledInvocation; 47 import org.jboss.invocation.PayloadKey; 48 import org.jboss.invocation.ServiceUnavailableException; 49 import org.jboss.logging.Logger; 50 51 59 public class JRMPInvokerProxyHA 60 extends JRMPInvokerProxy 61 implements InvokerProxyHA, Externalizable 62 { 63 67 private static final long serialVersionUID = -967671822225981666L; 68 private static final Logger log = Logger.getLogger(JRMPInvokerProxyHA.class); 69 public static final HashSet colocation = new HashSet (); 70 public static final WeakHashMap txFailoverAuthorizations = new WeakHashMap (); 71 72 protected LoadBalancePolicy loadBalancePolicy; 73 protected String proxyFamilyName = null; 74 75 FamilyClusterInfo familyClusterInfo = null; 76 78 protected transient boolean trace = false; 79 80 public JRMPInvokerProxyHA() {} 81 82 public JRMPInvokerProxyHA(List targets, LoadBalancePolicy policy, 83 String proxyFamilyName, long viewId) 84 { 85 this.familyClusterInfo = ClusteringTargetsRepository.initTarget (proxyFamilyName, targets, viewId); 86 this.loadBalancePolicy = policy; 87 this.proxyFamilyName = proxyFamilyName; 88 this.trace = log.isTraceEnabled(); 89 if( trace ) 90 log.trace("Init, cluterInfo: "+familyClusterInfo+", policy="+loadBalancePolicy); 91 } 92 93 public void updateClusterInfo (ArrayList targets, long viewId) 94 { 95 if (familyClusterInfo != null) 96 this.familyClusterInfo.updateClusterInfo (targets, viewId); 97 } 98 99 public Object getRemoteTarget() 100 { 101 return getRemoteTarget(null); 102 } 103 public Object getRemoteTarget(Invocation invocationBasedRouting) 104 { 105 return loadBalancePolicy.chooseTarget(this.familyClusterInfo, invocationBasedRouting); 106 } 107 108 public void remoteTargetHasFailed(Object target) 109 { 110 removeDeadTarget(target); 111 } 112 113 protected void removeDeadTarget(Object target) 114 { 115 if (this.familyClusterInfo != null) 117 this.familyClusterInfo.removeDeadTarget (target); 118 } 119 120 protected int totalNumberOfTargets () 121 { 122 if (this.familyClusterInfo != null) 123 return this.familyClusterInfo.getTargets ().size (); 124 else 125 return 0; 126 } 127 128 protected void resetView () 129 { 130 this.familyClusterInfo.resetView (); 131 } 132 133 136 public boolean isLocal(Invocation invocation) 137 { 138 return colocation.contains(invocation.getObjectName()); 139 } 140 141 public boolean txContextAllowsFailover (Invocation invocation) 142 { 143 javax.transaction.Transaction tx = invocation.getTransaction(); 144 if (tx != null) 145 { 146 synchronized (tx) 147 { 148 return ! txFailoverAuthorizations.containsKey (tx); 149 } 150 } 151 else 152 { 153 return true; 154 } 155 } 156 157 public void invocationHasReachedAServer (Invocation invocation) 158 { 159 javax.transaction.Transaction tx = invocation.getTransaction(); 160 if (tx != null) 161 { 162 synchronized (tx) 163 { 164 txFailoverAuthorizations.put (tx, null); 165 } 166 } 167 } 168 169 173 public Object invoke(Invocation invocation) 174 throws Exception 175 { 176 int failoverCounter = 0; 180 invocation.setValue ("FAILOVER_COUNTER", new Integer (failoverCounter), PayloadKey.AS_IS); 181 182 if (isLocal(invocation)) 184 { 185 return InvokerInterceptor.getLocal().invoke(invocation); 186 } 187 else 188 { 189 MarshalledInvocation mi = new MarshalledInvocation(invocation); 191 192 mi.setTransactionPropagationContext(getTransactionPropagationContext()); 194 mi.setValue("CLUSTER_VIEW_ID", new Long (this.familyClusterInfo.getCurrentViewId ())); 195 Invoker target = (Invoker)getRemoteTarget(invocation); 196 197 boolean failoverAuthorized = true; 198 Exception lastException = null; 199 while (target != null && failoverAuthorized) 200 { 201 boolean definitivlyRemoveNodeOnFailure = true; 202 try 203 { 204 if( trace ) 205 log.trace("Invoking on target="+target); 206 Object rtnObj = target.invoke(mi); 207 HARMIResponse rsp = null; 208 if (rtnObj instanceof MarshalledObject ) 209 { 210 rsp = (HARMIResponse)((MarshalledObject )rtnObj).get(); 211 } 212 else 213 { 214 rsp = (HARMIResponse)rtnObj; 215 } 216 if (rsp.newReplicants != null) 217 { 218 if( trace ) 219 { 220 log.trace("newReplicants: "+rsp.newReplicants); 221 } 222 updateClusterInfo (rsp.newReplicants, rsp.currentViewId); 223 } 224 226 invocationHasReachedAServer (invocation); 227 228 return rsp.response; 229 } 230 catch (java.net.ConnectException e) 231 { 232 lastException = e; 233 } 234 catch (java.net.UnknownHostException e) 235 { 236 lastException = e; 237 } 238 catch (java.rmi.ConnectException e) 239 { 240 lastException = e; 241 } 242 catch (java.rmi.ConnectIOException e) 243 { 244 lastException = e; 245 } 246 catch (java.rmi.NoSuchObjectException e) 247 { 248 lastException = e; 249 } 250 catch (java.rmi.UnknownHostException e) 251 { 252 lastException = e; 253 } 254 catch (GenericClusteringException e) 255 { 256 lastException = e; 257 if (e.getCompletionStatus () == GenericClusteringException.COMPLETED_NO) 262 { 263 if (totalNumberOfTargets() >= failoverCounter) 267 { 268 if (!e.isDefinitive ()) 269 definitivlyRemoveNodeOnFailure = false; 270 } 271 } 272 else 273 { 274 invocationHasReachedAServer (invocation); 275 throw new ServerException ("Clustering error", e); 276 } 277 } 278 catch (ServerException e) 279 { 280 invocationHasReachedAServer (invocation); 283 if (e.detail instanceof TransactionRolledbackException ) 284 { 285 throw (TransactionRolledbackException ) e.detail; 286 } 287 if (e.detail instanceof RemoteException ) 288 { 289 throw (RemoteException ) e.detail; 290 } 291 throw e; 292 } 293 catch (Exception e) 294 { 295 lastException = e; 296 invocationHasReachedAServer (invocation); 297 throw e; 298 } 299 300 if( trace ) 301 log.trace("Invoke failed, target="+target, lastException); 302 303 remoteTargetHasFailed(target); 305 if (!definitivlyRemoveNodeOnFailure) 306 { 307 resetView (); 308 } 309 310 failoverAuthorized = txContextAllowsFailover (invocation); 311 target = (Invoker)getRemoteTarget(invocation); 312 313 failoverCounter++; 314 mi.setValue ("FAILOVER_COUNTER", new Integer (failoverCounter), PayloadKey.AS_IS); 315 } 316 String msg = "Service unavailable."; 318 if (failoverAuthorized == false) 319 { 320 msg = "Service unavailable (failover not possible inside a user transaction)."; 321 } 322 throw new ServiceUnavailableException(msg, lastException); 323 } 324 } 325 326 333 public void writeExternal(final ObjectOutput out) 334 throws IOException 335 { 336 List targets = null; 338 long vid = 0; 339 synchronized (this.familyClusterInfo) 340 { 341 targets = this.familyClusterInfo.getTargets (); 342 vid = this.familyClusterInfo.getCurrentViewId (); 343 } 344 out.writeObject(targets); 345 out.writeObject(this.loadBalancePolicy); 346 out.writeObject (this.proxyFamilyName); 347 out.writeLong (vid); 348 } 349 350 355 public void readExternal(final ObjectInput in) 356 throws IOException , ClassNotFoundException 357 { 358 List targets = (List )in.readObject(); 359 this.loadBalancePolicy = (LoadBalancePolicy)in.readObject(); 360 this.proxyFamilyName = (String )in.readObject(); 361 long vid = in.readLong (); 362 363 this.familyClusterInfo = ClusteringTargetsRepository.initTarget (this.proxyFamilyName, targets, vid); 366 this.trace = log.isTraceEnabled(); 367 if( trace ) 368 log.trace("Init, clusterInfo: "+familyClusterInfo+", policy="+loadBalancePolicy); 369 } 370 371 373 } 375 | Popular Tags |