KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > invocation > unified > interfaces > UnifiedInvokerHAProxy


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.invocation.unified.interfaces;
23
24 import java.io.IOException JavaDoc;
25 import java.io.ObjectInput JavaDoc;
26 import java.io.ObjectOutput JavaDoc;
27 import java.io.StreamCorruptedException JavaDoc;
28 import java.net.MalformedURLException JavaDoc;
29 import java.rmi.MarshalledObject JavaDoc;
30 import java.rmi.RemoteException JavaDoc;
31 import java.rmi.ServerException JavaDoc;
32 import java.util.ArrayList JavaDoc;
33 import java.util.List JavaDoc;
34 import java.util.WeakHashMap JavaDoc;
35 import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository;
36 import org.jboss.ha.framework.interfaces.FamilyClusterInfo;
37 import org.jboss.ha.framework.interfaces.GenericClusteringException;
38 import org.jboss.ha.framework.interfaces.HARMIResponse;
39 import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
40 import org.jboss.invocation.Invocation;
41 import org.jboss.invocation.PayloadKey;
42 import org.jboss.invocation.ServiceUnavailableException;
43 import org.jboss.remoting.CannotConnectException;
44 import org.jboss.remoting.Client;
45 import org.jboss.remoting.InvokerLocator;
46
47 /**
48  * @author <a HREF="mailto:tom.elrod@jboss.com">Tom Elrod</a>
49  */

50 public class UnifiedInvokerHAProxy extends UnifiedInvokerProxy
51 {
52
53    static final long serialVersionUID = -4813929243402349966L;
54
55    private LoadBalancePolicy loadBalancePolicy;
56    private String JavaDoc proxyFamilyName = null;
57
58    private FamilyClusterInfo familyClusterInfo = null;
59
60    public static final WeakHashMap JavaDoc txFailoverAuthorizations = new WeakHashMap JavaDoc();
61
62
63    public UnifiedInvokerHAProxy()
64    {
65       super();
66       log.debug("UnifiedInvokerHAProxy constructor called with no arguments.");
67       setSubSystem("invokerha");
68    }
69
70    public UnifiedInvokerHAProxy(InvokerLocator locator, boolean isStrictRMIException,
71                                 List JavaDoc targets, LoadBalancePolicy policy,
72                                 String JavaDoc proxyFamilyName, long viewId)
73    {
74       super(locator, isStrictRMIException);
75
76       this.familyClusterInfo = ClusteringTargetsRepository.initTarget(proxyFamilyName, targets, viewId);
77       this.loadBalancePolicy = policy;
78       this.proxyFamilyName = proxyFamilyName;
79
80       setSubSystem("invokerha");
81    }
82
83    public boolean txContextAllowsFailover(Invocation invocation)
84    {
85       javax.transaction.Transaction JavaDoc tx = invocation.getTransaction();
86       if(tx != null)
87       {
88          synchronized(tx)
89          {
90             return ! txFailoverAuthorizations.containsKey(tx);
91          }
92       }
93       else
94       {
95          return true;
96       }
97    }
98
99    public void invocationHasReachedAServer(Invocation invocation)
100    {
101       javax.transaction.Transaction JavaDoc tx = invocation.getTransaction();
102       if(tx != null)
103       {
104          synchronized(tx)
105          {
106             txFailoverAuthorizations.put(tx, null);
107          }
108       }
109    }
110
111    protected int totalNumberOfTargets()
112    {
113       if(this.familyClusterInfo != null)
114       {
115          return this.familyClusterInfo.getTargets().size();
116       }
117       else
118       {
119          return 0;
120       }
121    }
122
123    protected void resetView()
124    {
125       this.familyClusterInfo.resetView();
126    }
127
128    /**
129     * Gets the remoting client to call on which is selected by the load balancing policy.
130     * If the target InvokerLocator selected is not for the current remoting client, a new one
131     * will be initialized.
132     *
133     * @param invocationBasedRouting
134     * @return
135     * @throws MalformedURLException
136     */

137    protected Client getClient(Invocation invocationBasedRouting) throws MalformedURLException JavaDoc
138    {
139       Object JavaDoc target = loadBalancePolicy.chooseTarget(familyClusterInfo, invocationBasedRouting);
140       InvokerLocator targetLocator = (InvokerLocator) target;
141
142       // check if load balancer pick the client invoker we already have
143
if(!getLocator().equals(targetLocator))
144       {
145          init(targetLocator);
146       }
147       return getClient();
148    }
149
150
151    /**
152     * @param invocation A pointer to the invocation object
153     * @return Return value of method invocation.
154     * @throws Exception Failed to invoke method.
155     */

156    public Object JavaDoc invoke(Invocation invocation) throws Exception JavaDoc
157    {
158       // we give the opportunity, to any server interceptor, to know if this a
159
// first invocation to a node or if it is a failovered call
160
//
161
int failoverCounter = 0;
162       invocation.setValue("FAILOVER_COUNTER", new Integer JavaDoc(failoverCounter), PayloadKey.AS_IS);
163
164       Object JavaDoc response = null;
165       Exception JavaDoc lastException = null;
166
167       boolean failoverAuthorized = true;
168       while(familyClusterInfo.getTargets() != null && familyClusterInfo.getTargets().size() > 0 && failoverAuthorized)
169       {
170          boolean definitivlyRemoveNodeOnFailure = true;
171
172          try
173          {
174             invocation.setValue("CLUSTER_VIEW_ID", new Long JavaDoc(this.familyClusterInfo.getCurrentViewId()));
175
176             log.debug("Client cluster view id: " + familyClusterInfo.getCurrentViewId());
177             log.debug(printPossibleTargets());
178
179             Client clientInstance = getClient(invocation);
180
181             log.debug("Making invocation on " + clientInstance.getInvoker().getLocator());
182
183             response = clientInstance.invoke(invocation, null);
184
185             HARMIResponse haResponse = null;
186
187             if(response instanceof Exception JavaDoc)
188             {
189                log.debug("Invocation returened exception: " + response);
190                if(response instanceof GenericClusteringException)
191                {
192                   GenericClusteringException gcex = (GenericClusteringException) response;
193                   lastException = gcex;
194                   // this is a generic clustering exception that contain the
195
// completion status: usefull to determine if we are authorized
196
// to re-issue a query to another node
197
//
198
if(gcex.getCompletionStatus() == GenericClusteringException.COMPLETED_NO)
199                   {
200                      // we don't want to remove the node from the list of failed
201
// node UNLESS there is a risk to indefinitively loop
202
//
203
if(totalNumberOfTargets() >= failoverCounter)
204                      {
205                         if(!gcex.isDefinitive())
206                         {
207                            definitivlyRemoveNodeOnFailure = false;
208                         }
209                      }
210                      removeDeadTarget(getLocator());
211                      if(!definitivlyRemoveNodeOnFailure)
212                      {
213                         resetView();
214                      }
215                      failoverAuthorized = txContextAllowsFailover(invocation);
216
217                      failoverCounter++;
218                      invocation.setValue("FAILOVER_COUNTER", new Integer JavaDoc(failoverCounter), PayloadKey.AS_IS);
219
220                      log.debug("Received GenericClusteringException where request was not completed. Will retry.");
221
222                      continue;
223                   }
224                   else
225                   {
226                      invocationHasReachedAServer(invocation);
227                      throw new ServerException JavaDoc("Clustering error", gcex);
228                   }
229                }
230                else
231                {
232                   throw ((Exception JavaDoc) response);
233                }
234             }
235             if(response instanceof MarshalledObject JavaDoc)
236             {
237                haResponse = (HARMIResponse) ((MarshalledObject JavaDoc) response).get();
238             }
239             else
240             {
241                haResponse = (HARMIResponse) response;
242             }
243
244             // check for clustered targets
245
if(haResponse.newReplicants != null)
246             {
247                updateClusterInfo(haResponse.newReplicants, haResponse.currentViewId);
248             }
249
250             response = haResponse.response;
251             return response;
252
253          }
254          catch(CannotConnectException cncEx)
255          {
256             log.debug("Invocation failed: CannotConnectException - " + cncEx, cncEx);
257             removeDeadTarget(getLocator());
258             resetView();
259             failoverAuthorized = txContextAllowsFailover(invocation);
260
261             failoverCounter++;
262             invocation.setValue("FAILOVER_COUNTER", new Integer JavaDoc(failoverCounter), PayloadKey.AS_IS);
263          }
264          catch(GenericClusteringException gcex)
265          {
266             lastException = gcex;
267             // this is a generic clustering exception that contain the
268
// completion status: usefull to determine if we are authorized
269
// to re-issue a query to another node
270
//
271
if(gcex.getCompletionStatus() == GenericClusteringException.COMPLETED_NO)
272             {
273                // we don't want to remove the node from the list of failed
274
// node UNLESS there is a risk to indefinitively loop
275
//
276
if(totalNumberOfTargets() >= failoverCounter)
277                {
278                   if(!gcex.isDefinitive())
279                   {
280                      definitivlyRemoveNodeOnFailure = false;
281                   }
282                }
283                removeDeadTarget(getLocator());
284                if(!definitivlyRemoveNodeOnFailure)
285                {
286                   resetView();
287                }
288                failoverAuthorized = txContextAllowsFailover(invocation);
289
290                failoverCounter++;
291                invocation.setValue("FAILOVER_COUNTER", new Integer JavaDoc(failoverCounter), PayloadKey.AS_IS);
292
293                log.debug("Received GenericClusteringException where request was not completed. Will retry.");
294             }
295             else
296             {
297                invocationHasReachedAServer(invocation);
298                throw new ServerException JavaDoc("Clustering error", gcex);
299             }
300          }
301          catch(RemoteException JavaDoc aex)
302          {
303             log.debug("Invocation failed: RemoteException - " + aex, aex);
304
305             // per Jira issue JBREM-61
306
if(isStrictRMIException())
307             {
308                throw new ServerException JavaDoc(aex.getMessage(), aex);
309             }
310             else
311             {
312                throw aex;
313             }
314          }
315          catch(Throwable JavaDoc throwable)
316          {
317             log.debug("Invocation failed: " + throwable, throwable);
318
319             // this is somewhat of a hack as remoting throws throwable,
320
// so will let Exception types bubble up, but if Throwable type,
321
// then have to wrap in new Exception, as this is the signature
322
// of this invoke method.
323
if(throwable instanceof Exception JavaDoc)
324             {
325                throw (Exception JavaDoc) throwable;
326             }
327             throw new Exception JavaDoc(throwable);
328          }
329       }
330
331       if(failoverAuthorized == false)
332       {
333          throw new ServiceUnavailableException("Service unavailable (failover not possible inside a user transaction) for " +
334                                                invocation.getObjectName() + " calling method " + invocation.getMethod(),
335                                                lastException);
336       }
337       else
338       {
339          throw new ServiceUnavailableException("Service unavailable for " +
340                                                invocation.getObjectName() + " calling method " + invocation.getMethod(),
341                                                lastException);
342       }
343    }
344
345    private Object JavaDoc printPossibleTargets()
346    {
347       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
348       if(familyClusterInfo != null)
349       {
350          List JavaDoc possibleTargets = familyClusterInfo.getTargets();
351          if(possibleTargets != null && possibleTargets.size() > 0)
352          {
353             for(int x = 0; x < possibleTargets.size(); x++)
354             {
355                buffer.append("\nPossible target " + (x + 1) + ": " + possibleTargets.get(x));
356             }
357          }
358       }
359       return buffer.toString();
360    }
361
362    private void removeDeadTarget(InvokerLocator locator)
363    {
364       if(locator != null)
365       {
366          if(this.familyClusterInfo != null)
367          {
368             familyClusterInfo.removeDeadTarget(locator);
369             log.debug("Removed " + locator + " from target list.");
370          }
371       }
372    }
373
374
375    private void updateClusterInfo(ArrayList JavaDoc newReplicants, long currentViewId)
376    {
377       if(familyClusterInfo != null)
378       {
379          familyClusterInfo.updateClusterInfo(newReplicants, currentViewId);
380          log.debug("Updating cluster info. New view id: " + currentViewId);
381          log.debug("New cluster target list is:");
382          for(int x = 0; x < newReplicants.size(); x++)
383          {
384             log.debug(newReplicants.get(x));
385          }
386       }
387    }
388
389    /**
390     * Externalize this instance and handle obtaining the remoteInvoker stub
391     */

392    public void writeExternal(final ObjectOutput JavaDoc out)
393          throws IOException JavaDoc
394    {
395       out.writeInt(CURRENT_VERSION);
396
397       out.writeUTF(getLocator().getOriginalURI());
398       out.writeBoolean(isStrictRMIException());
399       // JBAS-2071 - sync on FCI to ensure targets and vid are consistent
400
List JavaDoc targets = null;
401       long vid = 0;
402       synchronized (this.familyClusterInfo)
403       {
404          targets = this.familyClusterInfo.getTargets ();
405          vid = this.familyClusterInfo.getCurrentViewId ();
406       }
407       out.writeObject(targets);
408       out.writeObject(this.loadBalancePolicy);
409       out.writeObject(this.proxyFamilyName);
410       out.writeLong(vid);
411    }
412
413    /**
414     * Un-externalize this instance.
415     */

416    public void readExternal(final ObjectInput JavaDoc in)
417          throws IOException JavaDoc, ClassNotFoundException JavaDoc
418    {
419       int version = in.readInt();
420       // Read in and map the version of the serialized data seen
421
switch(version)
422       {
423          case VERSION_5_0:
424             setLocator(new InvokerLocator(in.readUTF()));
425             setStrictRMIException(in.readBoolean());
426             init(getLocator());
427
428             List JavaDoc targets = (List JavaDoc) in.readObject();
429             this.loadBalancePolicy = (LoadBalancePolicy) in.readObject();
430             this.proxyFamilyName = (String JavaDoc) in.readObject();
431             long vid = in.readLong();
432
433             // keep a reference on our family object
434
//
435
this.familyClusterInfo = ClusteringTargetsRepository.initTarget(this.proxyFamilyName, targets, vid);
436
437             break;
438          default:
439             throw new StreamCorruptedException JavaDoc("Unknown version seen: " + version);
440       }
441    }
442
443
444 }
Popular Tags