KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ha > framework > server > HAPartitionImpl


1 /**
2  * JBoss, the OpenSource J2EE WebOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7
8 package org.jboss.ha.framework.server;
9
10 import java.io.ByteArrayInputStream JavaDoc;
11 import java.io.ByteArrayOutputStream JavaDoc;
12 import java.io.Serializable JavaDoc;
13 import java.text.SimpleDateFormat JavaDoc;
14 import java.util.ArrayList JavaDoc;
15 import java.util.Date JavaDoc;
16 import java.util.HashMap JavaDoc;
17 import java.util.Iterator JavaDoc;
18 import java.util.Vector JavaDoc;
19
20 import javax.naming.Context JavaDoc;
21 import javax.naming.InitialContext JavaDoc;
22 import javax.naming.Name JavaDoc;
23 import javax.naming.NameNotFoundException JavaDoc;
24 import javax.naming.Reference JavaDoc;
25 import javax.naming.StringRefAddr JavaDoc;
26 import javax.management.MBeanServer JavaDoc;
27
28 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
29
30 import org.jgroups.JChannel;
31 import org.jgroups.MergeView;
32 import org.jgroups.View;
33 import org.jgroups.Message;
34 import org.jgroups.blocks.GroupRequest;
35 import org.jgroups.blocks.MethodCall;
36 import org.jgroups.stack.IpAddress;
37 import org.jgroups.util.Rsp;
38 import org.jgroups.util.RspList;
39 import org.jgroups.util.Util;
40
41 import org.jboss.invocation.MarshalledValueInputStream;
42 import org.jboss.invocation.MarshalledValueOutputStream;
43 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
44 import org.jboss.ha.framework.interfaces.DistributedState;
45 import org.jboss.ha.framework.interfaces.HAPartition;
46 import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
47 import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
48 import org.jboss.ha.framework.interfaces.ClusterNode;
49
50 import org.jboss.naming.NonSerializableFactory;
51 import org.jboss.logging.Logger;
52
53 /**
54  * This class is an abstraction class for a JGroups RPCDispatch and JChannel.
55  * It is a default implementation of HAPartition for the
56  * <a HREF="http://www.jgroups.com/">JGroups</A> framework
57  *
58  * @author <a HREF="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
59  * @author <a HREF="mailto:bill@burkecentral.com">Bill Burke</a>.
60  * @author Scott.Stark@jboss.org
61  * @version $Revision: 1.43.2.6 $
62  */

63 public class HAPartitionImpl
64    extends org.jgroups.blocks.RpcDispatcher
65    implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
66       HAPartition
67 {
68    private static class NoHandlerForRPC implements Serializable JavaDoc
69    {
70       static final long serialVersionUID = -1263095408483622838L;
71    }
72
73    // Constants -----------------------------------------------------
74

75    // final MethodLookup method_lookup_clos = new MethodLookupClos();
76

77    // Attributes ----------------------------------------------------
78

79    protected HashMap JavaDoc rpcHandlers = new HashMap JavaDoc();
80    protected HashMap JavaDoc stateHandlers = new HashMap JavaDoc();
81    /** The HAMembershipListener and HAMembershipExtendedListeners */
82    protected ArrayList JavaDoc listeners = new ArrayList JavaDoc();
83    /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
84    protected ArrayList JavaDoc asynchListeners = new ArrayList JavaDoc();
85    /** The LinkedQueue<ViewChangeEvent> of changes to notify asynch listeners of */
86    protected LinkedQueue asynchViewChanges = new LinkedQueue();
87    /** The Thread used to send membership change notifications asynchronously */
88    protected Thread JavaDoc asynchNotifyThread;
89    /** The current cluster partition members */
90    protected Vector JavaDoc members = null;
91    protected Vector JavaDoc jgmembers = null;
92
93    public Vector JavaDoc history = null;
94
95    /** The partition members other than this node */
96    protected Vector JavaDoc otherMembers = null;
97    protected Vector JavaDoc jgotherMembers = null;
98    /** The JChannel name */
99    protected String JavaDoc partitionName;
100    /** the local JG IP Address */
101    protected org.jgroups.stack.IpAddress localJGAddress = null;
102    /** The cluster transport protocol address string */
103    protected String JavaDoc nodeName;
104    /** me as a ClusterNode */
105    protected ClusterNode me = null;
106    /** The timeout for cluster RPC calls */
107    protected long timeout = 60000;
108    /** The JGroups partition channel */
109    protected JChannel channel;
110    /** The cluster replicant manager */
111    protected DistributedReplicantManagerImpl replicantManager;
112    /** The cluster state manager */
113    protected DistributedStateImpl dsManager;
114    /** The cluster instance log category */
115    protected Logger log;
116    protected Logger clusterLifeCycleLog;
117    /** The current cluster view id */
118    protected long currentViewId = -1;
119    /** The JMX MBeanServer to use for registrations */
120    protected MBeanServer JavaDoc server;
121    /** Number of ms to wait for state */
122    protected long state_transfer_timeout=60000;
123
124    // Static --------------------------------------------------------
125

126    /**
127     * Creates an object from a byte buffer
128     */

129    public static Object JavaDoc objectFromByteBuffer (byte[] buffer) throws Exception JavaDoc
130    {
131       if(buffer == null)
132          return null;
133
134       ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(buffer);
135       MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
136       return mvis.readObject();
137    }
138    
139    /**
140     * Serializes an object into a byte buffer.
141     * The object has to implement interface Serializable or Externalizable
142     */

143    public static byte[] objectToByteBuffer (Object JavaDoc obj) throws Exception JavaDoc
144    {
145       ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
146       MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
147       mvos.writeObject(obj);
148       mvos.flush();
149       return baos.toByteArray();
150    }
151
152    public long getStateTransferTimeout() {
153       return state_transfer_timeout;
154    }
155
156    public void setStateTransferTimeout(long state_transfer_timeout) {
157       this.state_transfer_timeout=state_transfer_timeout;
158    }
159
160
161    public long getMethodCallTimeout() {
162       return timeout;
163    }
164
165    public void setMethodCallTimeout(long timeout) {
166       this.timeout=timeout;
167    }
168
169     // Constructors --------------------------------------------------
170

171    public HAPartitionImpl(String JavaDoc partitionName, org.jgroups.JChannel channel, boolean deadlock_detection, MBeanServer JavaDoc server) throws Exception JavaDoc
172    {
173       this(partitionName, channel, deadlock_detection);
174       this.server = server;
175    }
176    
177    public HAPartitionImpl(String JavaDoc partitionName, org.jgroups.JChannel channel, boolean deadlock_detection) throws Exception JavaDoc
178    {
179       super(channel, null, null, new Object JavaDoc(), deadlock_detection); // init RpcDispatcher with a fake target object
180
this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
181       this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
182       this.channel = channel;
183       this.partitionName = partitionName;
184       this.history = new Vector JavaDoc();
185       logHistory ("Partition object created");
186    }
187    
188     // Public --------------------------------------------------------
189

190    public void init() throws Exception JavaDoc
191    {
192       log.info("Initializing");
193       logHistory ("Initializing partition");
194
195       // Subscribe to dHA events comming generated by the org.jgroups. protocol stack
196
//
197
log.debug("setMembershipListener");
198       setMembershipListener(this);
199       log.debug("setMessageListener");
200       setMessageListener(this);
201       
202       // Create the DRM and link it to this HAPartition
203
//
204
log.debug("create replicant manager");
205       this.replicantManager = new DistributedReplicantManagerImpl(this, this.server);
206       log.debug("init replicant manager");
207       this.replicantManager.init();
208       log.debug("bind replicant manager");
209       
210       // Create the DS and link it to this HAPartition
211
//
212
log.debug("create distributed state");
213       this.dsManager = new DistributedStateImpl(this, this.server);
214       log.debug("init distributed state service");
215       this.dsManager.init();
216       log.debug("bind distributed state service");
217
218       
219       // Bind ourself in the public JNDI space
220
//
221
Context JavaDoc ctx = new InitialContext JavaDoc();
222       this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
223       
224       log.debug("done initing.");
225    }
226    
227    public void startPartition() throws Exception JavaDoc
228    {
229       // get current JG group properties
230
//
231
logHistory ("Starting partition");
232       log.debug("get nodeName");
233       this.localJGAddress = (IpAddress)channel.getLocalAddress();
234       this.me = new ClusterNode(this.localJGAddress);
235       this.nodeName = this.me.getName();
236
237       log.debug("Get current members");
238       View view = channel.getView();
239       this.jgmembers = (Vector JavaDoc)view.getMembers().clone();
240       this.members = translateAddresses(this.jgmembers); // TRANSLATE
241
log.info("Number of cluster members: " + members.size());
242       for(int m = 0; m > members.size(); m ++)
243       {
244          Object JavaDoc node = members.get(m);
245          log.debug(node);
246       }
247       // Keep a list of other members only for "exclude-self" RPC calls
248
//
249
this.jgotherMembers = (Vector JavaDoc)view.getMembers().clone();
250       this.jgotherMembers.remove (channel.getLocalAddress());
251       this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
252
log.info ("Other members: " + this.otherMembers.size ());
253
254       verifyNodeIsUnique (view.getMembers());
255
256       // Update the initial view id
257
//
258
this.currentViewId = view.getVid().getId();
259
260       // We must now synchronize new state transfer subscriber
261
//
262
log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
263       boolean rc = channel.getState(null, this.state_transfer_timeout);
264       if (rc)
265          log.debug("State was retrieved successfully");
266       else
267          log.debug("State could not be retrieved, (must be first member of group)");
268       
269       // We start now able to start our DRM and DS
270
//
271
this.replicantManager.start();
272       this.dsManager.start();
273
274       // Create the asynch listener handler thread
275
AsynchViewChangeHandler asynchHandler = new AsynchViewChangeHandler();
276       asynchNotifyThread = new Thread JavaDoc(asynchHandler, "AsynchHAMembershipListener Thread");
277       asynchNotifyThread.start();
278    }
279
280    public void closePartition() throws Exception JavaDoc
281    {
282       logHistory ("Closing partition");
283       log.info("Closing partition " + partitionName);
284
285       try
286       {
287          asynchNotifyThread.interrupt();
288       }
289       catch( Exception JavaDoc e)
290       {
291          log.warn("Failed to interrupte asynchNotifyThread", e);
292       }
293
294       // Stop the DRM and DS services
295
//
296
try
297       {
298          this.replicantManager.stop();
299       }
300       catch (Exception JavaDoc e)
301       {
302          log.error("operation failed", e);
303       }
304
305       try
306       {
307          this.dsManager.stop();
308       }
309       catch (Exception JavaDoc e)
310       {
311          log.error("operation failed", e);
312       }
313
314 // NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
315
// add the destroyPartition() step
316
try
317       {
318 // channel.close();
319
channel.disconnect();
320       }
321       catch (Exception JavaDoc e)
322       {
323          log.error("operation failed", e);
324       }
325
326 // String boundName = "/HAPartition/" + partitionName;
327
//
328
// InitialContext ctx = new InitialContext();
329
// try
330
// {
331
//
332
// ctx.unbind(boundName);
333
// }
334
// finally
335
// {
336
// ctx.close();
337
// }
338
// NonSerializableFactory.unbind (boundName);
339

340       log.info("Partition " + partitionName + " closed.");
341    }
342
343 // NR 200505 : [JBCLUSTER-38] destroy partition close the channel
344
public void destroyPartition() throws Exception JavaDoc
345    {
346
347       try
348       {
349          this.replicantManager.destroy();
350       }
351       catch (Exception JavaDoc e)
352       {
353          log.error("operation failed", e);
354       }
355
356       try
357       {
358          this.dsManager.destroy();
359       }
360       catch (Exception JavaDoc e)
361       {
362          log.error("operation failed", e);
363       }
364       try
365       {
366          channel.close();
367       }
368       catch (Exception JavaDoc e)
369       {
370          log.error("operation failed", e);
371       }
372       
373        String JavaDoc boundName = "/HAPartition/" + partitionName;
374
375       InitialContext JavaDoc ctx = new InitialContext JavaDoc();
376       try
377       {
378          
379          ctx.unbind(boundName);
380       }
381       finally
382       {
383          ctx.close();
384       }
385       NonSerializableFactory.unbind (boundName);
386
387       log.info("Partition " + partitionName + " destroyed.");
388   }
389
390    // org.jgroups.MessageListener implementation ----------------------------------------------
391

392    // MessageListener methods
393
//
394
public byte[] getState()
395    {
396       logHistory ("getState called on partition");
397       boolean debug = log.isDebugEnabled();
398       
399       log.debug("getState called.");
400       try
401       {
402          // we now get the sub-state of each HAPartitionStateTransfer subscribers and
403
// build a "macro" state
404
//
405
HashMap JavaDoc state = new HashMap JavaDoc();
406          Iterator JavaDoc keys = stateHandlers.keySet().iterator();
407          while (keys.hasNext())
408          {
409             String JavaDoc key = (String JavaDoc)keys.next();
410             HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
411             if (debug)
412                log.debug("getState for " + key);
413             state.put(key, subscriber.getCurrentState());
414          }
415          return objectToByteBuffer(state);
416       }
417       catch (Exception JavaDoc ex)
418       {
419          log.error("getState failed", ex);
420       }
421       return null;
422    }
423    
424    public void setState(byte[] obj)
425    {
426       logHistory ("setState called on partition");
427       try
428       {
429          log.debug("setState called");
430          if (obj == null)
431          {
432             log.debug("state is null");
433             return;
434          }
435          
436          long used_mem_before, used_mem_after;
437          int state_size=obj != null? obj.length : 0;
438          Runtime JavaDoc rt=Runtime.getRuntime();
439          used_mem_before=rt.totalMemory() - rt.freeMemory();
440
441          HashMap JavaDoc state = (HashMap JavaDoc)objectFromByteBuffer(obj);
442          java.util.Iterator JavaDoc keys = state.keySet().iterator();
443          while (keys.hasNext())
444          {
445             String JavaDoc key = (String JavaDoc)keys.next();
446             log.debug("setState for " + key);
447             Object JavaDoc someState = state.get(key);
448             HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
449             if (subscriber != null)
450             {
451                subscriber.setCurrentState((java.io.Serializable JavaDoc)someState);
452             }
453             else
454             {
455                log.debug("There is no stateHandler for: " + key);
456             }
457          }
458
459          used_mem_after=rt.totalMemory() - rt.freeMemory();
460          log.debug("received a state of " + state_size + " bytes; expanded memory by " +
461                (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
462                ", used memory after: " + used_mem_after + ")");
463       }
464       catch (Exception JavaDoc ex)
465       {
466          log.error("setState failed", ex);
467       }
468    }
469    
470    public void receive(org.jgroups.Message msg)
471    { /* complete */}
472    
473    // org.jgroups.MembershipListener implementation ----------------------------------------------
474

475    public void suspect(org.jgroups.Address suspected_mbr)
476    {
477       logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
478       if (isCurrentNodeCoordinator ())
479          clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
480       else
481          log.info("Suspected member: " + suspected_mbr);
482    }
483
484    public void block() {}
485    
486    /** Notification of a cluster view change. This is done from the JG protocol
487     * handlder thread and we must be careful to not unduly block this thread.
488     * Because of this there are two types of listeners, synchronous and
489     * asynchronous. The synchronous listeners are messaged with the view change
490     * event using the calling thread while the asynchronous listeners are
491     * messaged using a seperate thread.
492     *
493     * @param newView
494     */

495    public void viewAccepted(View newView)
496    {
497       try
498       {
499          // we update the view id
500
//
501
this.currentViewId = newView.getVid().getId();
502
503          // Keep a list of other members only for "exclude-self" RPC calls
504
//
505
this.jgotherMembers = (Vector JavaDoc)newView.getMembers().clone();
506          this.jgotherMembers.remove (channel.getLocalAddress());
507          this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
508
Vector JavaDoc translatedNewView = translateAddresses ((Vector JavaDoc)newView.getMembers().clone());
509          logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
510                      " (old view: " + this.members + " )");
511
512
513          // Save the previous view and make a copy of the new view
514
Vector JavaDoc oldMembers = this.members;
515
516          Vector JavaDoc newjgMembers = (Vector JavaDoc)newView.getMembers().clone();
517          Vector JavaDoc newMembers = translateAddresses(newjgMembers); // TRANSLATE
518
if (this.members == null)
519          {
520             // Initial viewAccepted
521
this.members = newMembers;
522             this.jgmembers = newjgMembers;
523             log.debug("ViewAccepted: initial members set");
524             return;
525          }
526          this.members = newMembers;
527          this.jgmembers = newjgMembers;
528
529          int difference = 0;
530          if (oldMembers == null)
531             difference = newMembers.size () - 1;
532          else
533             difference = newMembers.size () - oldMembers.size ();
534          
535          if (isCurrentNodeCoordinator ())
536             clusterLifeCycleLog.info ("New cluster view for partition " + this.partitionName + " (id: " +
537                                       this.currentViewId + ", delta: " + difference + ") : " + this.members);
538          else
539             log.info("New cluster view for partition " + this.partitionName + ": " +
540                      this.currentViewId + " (" + this.members + " delta: " + difference + ")");
541
542          // Build a ViewChangeEvent for the asynch listeners
543
ViewChangeEvent event = new ViewChangeEvent();
544          event.viewId = currentViewId;
545          event.allMembers = translatedNewView;
546          event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
547          event.newMembers = getNewMembers(oldMembers, event.allMembers);
548          event.originatingGroups = null;
549          // if the new view occurs because of a merge, we first inform listeners of the merge
550
if(newView instanceof MergeView)
551          {
552             MergeView mergeView = (MergeView) newView;
553             event.originatingGroups = mergeView.getSubgroups();
554          }
555
556          log.debug("membership changed from " + this.members.size() + " to "
557             + event.allMembers.size());
558          // Put the view change to the asynch queue
559
this.asynchViewChanges.put(event);
560
561          // Broadcast the new view to the synchronous view change listeners
562
this.notifyListeners(listeners, event.viewId, event.allMembers,
563             event.deadMembers, event.newMembers, event.originatingGroups);
564       }
565       catch (Exception JavaDoc ex)
566       {
567          log.error("ViewAccepted failed", ex);
568       }
569    }
570
571    // HAPartition implementation ----------------------------------------------
572

573    public String JavaDoc getNodeName()
574    {
575       return nodeName;
576    }
577    
578    public String JavaDoc getPartitionName()
579    {
580       return partitionName;
581    }
582    
583    public DistributedReplicantManager getDistributedReplicantManager()
584    {
585       return replicantManager;
586    }
587    
588    public DistributedState getDistributedStateService()
589    {
590       return this.dsManager;
591    }
592
593    public long getCurrentViewId()
594    {
595       return this.currentViewId;
596    }
597    
598    public Vector JavaDoc getCurrentView()
599    {
600       Vector JavaDoc result = new Vector JavaDoc (this.members.size());
601       for (int i = 0; i < members.size(); i++)
602       {
603          result.add( ((ClusterNode) members.elementAt(i)).getName() );
604       }
605       return result;
606    }
607
608    public ClusterNode[] getClusterNodes ()
609    {
610       ClusterNode[] nodes = new ClusterNode[this.members.size()];
611       this.members.toArray(nodes);
612       return nodes;
613    }
614
615    public ClusterNode getClusterNode ()
616    {
617       return me;
618    }
619
620    public boolean isCurrentNodeCoordinator ()
621    {
622       if(this.members == null || this.members.size() == 0 || this.me == null)
623          return false;
624      return this.members.elementAt (0).equals (this.me);
625    }
626
627    // ***************************
628
// ***************************
629
// RPC multicast communication
630
// ***************************
631
// ***************************
632
//
633
public void registerRPCHandler(String JavaDoc objName, Object JavaDoc subscriber)
634    {
635       rpcHandlers.put(objName, subscriber);
636    }
637    
638    public void unregisterRPCHandler(String JavaDoc objName, Object JavaDoc subscriber)
639    {
640       rpcHandlers.remove(objName);
641    }
642       
643
644    /**
645     *
646     * @param objName
647     * @param methodName
648     * @param args
649     * @param excludeSelf
650     * @return
651     * @throws Exception
652     * @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
653     */

654    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
655       Object JavaDoc[] args, boolean excludeSelf) throws Exception JavaDoc
656    {
657       return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
658    }
659
660    /**
661     * This function is an abstraction of RpcDispatcher.
662     */

663    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
664       Object JavaDoc[] args, Class JavaDoc[] types, boolean excludeSelf) throws Exception JavaDoc
665    {
666       return callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.timeout);
667    }
668
669
670    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
671        Object JavaDoc[] args, Class JavaDoc[] types, boolean excludeSelf, long methodTimeout) throws Exception JavaDoc
672    {
673       ArrayList JavaDoc rtn = new ArrayList JavaDoc();
674       MethodCall m=null;
675       RspList rsp = null;
676       boolean trace = log.isTraceEnabled();
677
678       if(types != null)
679          m=new MethodCall(objName + "." + methodName, args, types);
680       else
681          m=new MethodCall(objName + "." + methodName, args);
682
683       if (excludeSelf)
684       {
685          if( trace )
686          {
687             log.trace("callMethodOnCluster(true), objName="+objName
688                +", methodName="+methodName+", members="+jgotherMembers);
689          }
690          rsp = this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
691       }
692       else
693       {
694          if( trace )
695          {
696             log.trace("callMethodOnCluster(false), objName="+objName
697                +", methodName="+methodName+", members="+members);
698          }
699          rsp = this.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
700       }
701
702       if (rsp != null)
703       {
704          for (int i = 0; i < rsp.size(); i++)
705          {
706             Object JavaDoc item = rsp.elementAt(i);
707             if (item instanceof Rsp)
708             {
709                Rsp response = (Rsp) item;
710                // Only include received responses
711
boolean wasReceived = response.wasReceived();
712                if( wasReceived == true )
713                {
714                   item = response.getValue();
715                   if (!(item instanceof NoHandlerForRPC))
716                      rtn.add(item);
717                }
718                else if( trace )
719                   log.trace("Ignoring non-received response: "+response);
720             }
721             else
722             {
723                if (!(item instanceof NoHandlerForRPC))
724                   rtn.add(item);
725                else if( trace )
726                   log.trace("Ignoring NoHandlerForRPC");
727             }
728          }
729       }
730
731       return rtn;
732     }
733
734    /**
735     * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
736     * cluster.
737     * and is replaced
738     * @param objName
739     * @param methodName
740     * @param args
741     * @param types
742     * @param excludeSelf
743     * @return
744     * @throws Exception
745     */

746    public ArrayList JavaDoc callMethodOnCoordinatorNode(String JavaDoc objName, String JavaDoc methodName,
747           Object JavaDoc[] args, Class JavaDoc[] types,boolean excludeSelf) throws Exception JavaDoc
748       {
749       return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf,this.timeout);
750       }
751
752    /**
753     * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
754     * cluster.
755     * and is replaced
756     * @param objName
757     * @param methodName
758     * @param args
759     * @param types
760     * @param excludeSelf
761     * @param methodTimeout
762     * @return
763     * @throws Exception
764     */

765    public ArrayList JavaDoc callMethodOnCoordinatorNode(String JavaDoc objName, String JavaDoc methodName,
766           Object JavaDoc[] args, Class JavaDoc[] types,boolean excludeSelf, long methodTimeout) throws Exception JavaDoc
767       {
768          ArrayList JavaDoc rtn = new ArrayList JavaDoc();
769          MethodCall m=null;
770          RspList rsp = null;
771          boolean trace = log.isTraceEnabled();
772
773          if(types != null)
774             m=new MethodCall(objName + "." + methodName, args, types);
775          else
776             m=new MethodCall(objName + "." + methodName, args);
777
778          if( trace )
779          {
780             log.trace("callMethodOnCoordinatorNode(false), objName="+objName
781                +", methodName="+methodName);
782          }
783
784          // the first cluster view member is the coordinator
785
Vector JavaDoc coordinatorOnly = new Vector JavaDoc();
786          // If we are the coordinator, only call ourself if 'excludeSelf' is false
787
if (false == isCurrentNodeCoordinator () ||
788              false == excludeSelf)
789             coordinatorOnly.addElement(this.jgmembers.elementAt (0));
790
791          rsp = this.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
792
793          if (rsp != null)
794          {
795             for (int i = 0; i < rsp.size(); i++)
796             {
797                Object JavaDoc item = rsp.elementAt(i);
798                if (item instanceof Rsp)
799                {
800                   Rsp response = (Rsp) item;
801                   // Only include received