1 22 package org.jboss.invocation.unified.interfaces; 23 24 import java.io.IOException ; 25 import java.io.ObjectInput ; 26 import java.io.ObjectOutput ; 27 import java.io.StreamCorruptedException ; 28 import java.net.MalformedURLException ; 29 import java.rmi.MarshalledObject ; 30 import java.rmi.RemoteException ; 31 import java.rmi.ServerException ; 32 import java.util.ArrayList ; 33 import java.util.List ; 34 import java.util.WeakHashMap ; 35 import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository; 36 import org.jboss.ha.framework.interfaces.FamilyClusterInfo; 37 import org.jboss.ha.framework.interfaces.GenericClusteringException; 38 import org.jboss.ha.framework.interfaces.HARMIResponse; 39 import org.jboss.ha.framework.interfaces.LoadBalancePolicy; 40 import org.jboss.invocation.Invocation; 41 import org.jboss.invocation.PayloadKey; 42 import org.jboss.invocation.ServiceUnavailableException; 43 import org.jboss.remoting.CannotConnectException; 44 import org.jboss.remoting.Client; 45 import org.jboss.remoting.InvokerLocator; 46 47 50 public class UnifiedInvokerHAProxy extends UnifiedInvokerProxy 51 { 52 53 static final long serialVersionUID = -4813929243402349966L; 54 55 private LoadBalancePolicy loadBalancePolicy; 56 private String proxyFamilyName = null; 57 58 private FamilyClusterInfo familyClusterInfo = null; 59 60 public static final WeakHashMap txFailoverAuthorizations = new WeakHashMap (); 61 62 63 public UnifiedInvokerHAProxy() 64 { 65 super(); 66 log.debug("UnifiedInvokerHAProxy constructor called with no arguments."); 67 setSubSystem("invokerha"); 68 } 69 70 public UnifiedInvokerHAProxy(InvokerLocator locator, boolean isStrictRMIException, 71 List targets, LoadBalancePolicy policy, 72 String proxyFamilyName, long viewId) 73 { 74 super(locator, isStrictRMIException); 75 76 this.familyClusterInfo = ClusteringTargetsRepository.initTarget(proxyFamilyName, targets, viewId); 77 this.loadBalancePolicy = policy; 78 this.proxyFamilyName = proxyFamilyName; 79 80 setSubSystem("invokerha"); 81 } 82 83 public boolean txContextAllowsFailover(Invocation invocation) 84 { 85 javax.transaction.Transaction tx = invocation.getTransaction(); 86 if(tx != null) 87 { 88 synchronized(tx) 89 { 90 return ! txFailoverAuthorizations.containsKey(tx); 91 } 92 } 93 else 94 { 95 return true; 96 } 97 } 98 99 public void invocationHasReachedAServer(Invocation invocation) 100 { 101 javax.transaction.Transaction tx = invocation.getTransaction(); 102 if(tx != null) 103 { 104 synchronized(tx) 105 { 106 txFailoverAuthorizations.put(tx, null); 107 } 108 } 109 } 110 111 protected int totalNumberOfTargets() 112 { 113 if(this.familyClusterInfo != null) 114 { 115 return this.familyClusterInfo.getTargets().size(); 116 } 117 else 118 { 119 return 0; 120 } 121 } 122 123 protected void resetView() 124 { 125 this.familyClusterInfo.resetView(); 126 } 127 128 137 protected Client getClient(Invocation invocationBasedRouting) throws MalformedURLException 138 { 139 Object target = loadBalancePolicy.chooseTarget(familyClusterInfo, invocationBasedRouting); 140 InvokerLocator targetLocator = (InvokerLocator) target; 141 142 if(!getLocator().equals(targetLocator)) 144 { 145 init(targetLocator); 146 } 147 return getClient(); 148 } 149 150 151 156 public Object invoke(Invocation invocation) throws Exception 157 { 158 int failoverCounter = 0; 162 invocation.setValue("FAILOVER_COUNTER", new Integer (failoverCounter), PayloadKey.AS_IS); 163 164 Object response = null; 165 Exception lastException = null; 166 167 boolean failoverAuthorized = true; 168 while(familyClusterInfo.getTargets() != null && familyClusterInfo.getTargets().size() > 0 && failoverAuthorized) 169 { 170 boolean definitivlyRemoveNodeOnFailure = true; 171 172 try 173 { 174 invocation.setValue("CLUSTER_VIEW_ID", new Long (this.familyClusterInfo.getCurrentViewId())); 175 176 log.debug("Client cluster view id: " + familyClusterInfo.getCurrentViewId()); 177 log.debug(printPossibleTargets()); 178 179 Client clientInstance = getClient(invocation); 180 181 log.debug("Making invocation on " + clientInstance.getInvoker().getLocator()); 182 183 response = clientInstance.invoke(invocation, null); 184 185 HARMIResponse haResponse = null; 186 187 if(response instanceof Exception ) 188 { 189 log.debug("Invocation returened exception: " + response); 190 if(response instanceof GenericClusteringException) 191 { 192 GenericClusteringException gcex = (GenericClusteringException) response; 193 lastException = gcex; 194 if(gcex.getCompletionStatus() == GenericClusteringException.COMPLETED_NO) 199 { 200 if(totalNumberOfTargets() >= failoverCounter) 204 { 205 if(!gcex.isDefinitive()) 206 { 207 definitivlyRemoveNodeOnFailure = false; 208 } 209 } 210 removeDeadTarget(getLocator()); 211 if(!definitivlyRemoveNodeOnFailure) 212 { 213 resetView(); 214 } 215 failoverAuthorized = txContextAllowsFailover(invocation); 216 217 failoverCounter++; 218 invocation.setValue("FAILOVER_COUNTER", new Integer (failoverCounter), PayloadKey.AS_IS); 219 220 log.debug("Received GenericClusteringException where request was not completed. Will retry."); 221 222 continue; 223 } 224 else 225 { 226 invocationHasReachedAServer(invocation); 227 throw new ServerException ("Clustering error", gcex); 228 } 229 } 230 else 231 { 232 throw ((Exception ) response); 233 } 234 } 235 if(response instanceof MarshalledObject ) 236 { 237 haResponse = (HARMIResponse) ((MarshalledObject ) response).get(); 238 } 239 else 240 { 241 haResponse = (HARMIResponse) response; 242 } 243 244 if(haResponse.newReplicants != null) 246 { 247 updateClusterInfo(haResponse.newReplicants, haResponse.currentViewId); 248 } 249 250 response = haResponse.response; 251 return response; 252 253 } 254 catch(CannotConnectException cncEx) 255 { 256 log.debug("Invocation failed: CannotConnectException - " + cncEx, cncEx); 257 removeDeadTarget(getLocator()); 258 resetView(); 259 failoverAuthorized = txContextAllowsFailover(invocation); 260 261 failoverCounter++; 262 invocation.setValue("FAILOVER_COUNTER", new Integer (failoverCounter), PayloadKey.AS_IS); 263 } 264 catch(GenericClusteringException gcex) 265 { 266 lastException = gcex; 267 if(gcex.getCompletionStatus() == GenericClusteringException.COMPLETED_NO) 272 { 273 if(totalNumberOfTargets() >= failoverCounter) 277 { 278 if(!gcex.isDefinitive()) 279 { 280 definitivlyRemoveNodeOnFailure = false; 281 } 282 } 283 removeDeadTarget(getLocator()); 284 if(!definitivlyRemoveNodeOnFailure) 285 { 286 resetView(); 287 } 288 failoverAuthorized = txContextAllowsFailover(invocation); 289 290 failoverCounter++; 291 invocation.setValue("FAILOVER_COUNTER", new Integer (failoverCounter), PayloadKey.AS_IS); 292 293 log.debug("Received GenericClusteringException where request was not completed. Will retry."); 294 } 295 else 296 { 297 invocationHasReachedAServer(invocation); 298 throw new ServerException ("Clustering error", gcex); 299 } 300 } 301 catch(RemoteException aex) 302 { 303 log.debug("Invocation failed: RemoteException - " + aex, aex); 304 305 if(isStrictRMIException()) 307 { 308 throw new ServerException (aex.getMessage(), aex); 309 } 310 else 311 { 312 throw aex; 313 } 314 } 315 catch(Throwable throwable) 316 { 317 log.debug("Invocation failed: " + throwable, throwable); 318 319 if(throwable instanceof Exception ) 324 { 325 throw (Exception ) throwable; 326 } 327 throw new Exception (throwable); 328 } 329 } 330 331 if(failoverAuthorized == false) 332 { 333 throw new ServiceUnavailableException("Service unavailable (failover not possible inside a user transaction) for " + 334 invocation.getObjectName() + " calling method " + invocation.getMethod(), 335 lastException); 336 } 337 else 338 { 339 throw new ServiceUnavailableException("Service unavailable for " + 340 invocation.getObjectName() + " calling method " + invocation.getMethod(), 341 lastException); 342 } 343 } 344 345 private Object printPossibleTargets() 346 { 347 StringBuffer buffer = new StringBuffer (); 348 if(familyClusterInfo != null) 349 { 350 List possibleTargets = familyClusterInfo.getTargets(); 351 if(possibleTargets != null && possibleTargets.size() > 0) 352 { 353 for(int x = 0; x < possibleTargets.size(); x++) 354 { 355 buffer.append("\nPossible target " + (x + 1) + ": " + possibleTargets.get(x)); 356 } 357 } 358 } 359 return buffer.toString(); 360 } 361 362 private void removeDeadTarget(InvokerLocator locator) 363 { 364 if(locator != null) 365 { 366 if(this.familyClusterInfo != null) 367 { 368 familyClusterInfo.removeDeadTarget(locator); 369 log.debug("Removed " + locator + " from target list."); 370 } 371 } 372 } 373 374 375 private void updateClusterInfo(ArrayList newReplicants, long currentViewId) 376 { 377 if(familyClusterInfo != null) 378 { 379 familyClusterInfo.updateClusterInfo(newReplicants, currentViewId); 380 log.debug("Updating cluster info. New view id: " + currentViewId); 381 log.debug("New cluster target list is:"); 382 for(int x = 0; x < newReplicants.size(); x++) 383 { 384 log.debug(newReplicants.get(x)); 385 } 386 } 387 } 388 389 392 public void writeExternal(final ObjectOutput out) 393 throws IOException 394 { 395 out.writeInt(CURRENT_VERSION); 396 397 out.writeUTF(getLocator().getOriginalURI()); 398 out.writeBoolean(isStrictRMIException()); 399 List targets = null; 401 long vid = 0; 402 synchronized (this.familyClusterInfo) 403 { 404 targets = this.familyClusterInfo.getTargets (); 405 vid = this.familyClusterInfo.getCurrentViewId (); 406 } 407 out.writeObject(targets); 408 out.writeObject(this.loadBalancePolicy); 409 out.writeObject(this.proxyFamilyName); 410 out.writeLong(vid); 411 } 412 413 416 public void readExternal(final ObjectInput in) 417 throws IOException , ClassNotFoundException 418 { 419 int version = in.readInt(); 420 switch(version) 422 { 423 case VERSION_5_0: 424 setLocator(new InvokerLocator(in.readUTF())); 425 setStrictRMIException(in.readBoolean()); 426 init(getLocator()); 427 428 List targets = (List ) in.readObject(); 429 this.loadBalancePolicy = (LoadBalancePolicy) in.readObject(); 430 this.proxyFamilyName = (String ) in.readObject(); 431 long vid = in.readLong(); 432 433 this.familyClusterInfo = ClusteringTargetsRepository.initTarget(this.proxyFamilyName, targets, vid); 436 437 break; 438 default: 439 throw new StreamCorruptedException ("Unknown version seen: " + version); 440 } 441 } 442 443 444 } | Popular Tags |