KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ha > framework > server > HAPartitionImpl


1 /**
2  * JBoss, the OpenSource J2EE WebOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7
8 package org.jboss.ha.framework.server;
9
10 import java.io.ByteArrayInputStream JavaDoc;
11 import java.io.ByteArrayOutputStream JavaDoc;
12 import java.io.Serializable JavaDoc;
13 import java.text.SimpleDateFormat JavaDoc;
14 import java.util.ArrayList JavaDoc;
15 import java.util.Date JavaDoc;
16 import java.util.HashMap JavaDoc;
17 import java.util.Iterator JavaDoc;
18 import java.util.Vector JavaDoc;
19
20 import javax.naming.Context JavaDoc;
21 import javax.naming.InitialContext JavaDoc;
22 import javax.naming.Name JavaDoc;
23 import javax.naming.NameNotFoundException JavaDoc;
24 import javax.naming.Reference JavaDoc;
25 import javax.naming.StringRefAddr JavaDoc;
26 import javax.management.MBeanServer JavaDoc;
27
28 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
29
30 import org.jgroups.JChannel;
31 import org.jgroups.MergeView;
32 import org.jgroups.View;
33 import org.jgroups.Message;
34 import org.jgroups.blocks.GroupRequest;
35 import org.jgroups.blocks.MethodCall;
36 import org.jgroups.stack.IpAddress;
37 import org.jgroups.util.Rsp;
38 import org.jgroups.util.RspList;
39 import org.jgroups.util.Util;
40
41 import org.jboss.invocation.MarshalledValueInputStream;
42 import org.jboss.invocation.MarshalledValueOutputStream;
43 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
44 import org.jboss.ha.framework.interfaces.DistributedState;
45 import org.jboss.ha.framework.interfaces.HAPartition;
46 import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
47 import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
48 import org.jboss.ha.framework.interfaces.ClusterNode;
49
50 import org.jboss.naming.NonSerializableFactory;
51 import org.jboss.logging.Logger;
52
53 /**
54  * This class is an abstraction class for a JGroups RPCDispatch and JChannel.
55  * It is a default implementation of HAPartition for the
56  * <a HREF="http://www.jgroups.com/">JGroups</A> framework
57  *
58  * @author <a HREF="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
59  * @author <a HREF="mailto:bill@burkecentral.com">Bill Burke</a>.
60  * @author Scott.Stark@jboss.org
61  * @version $Revision: 1.43.2.6 $
62  */

63 public class HAPartitionImpl
64    extends org.jgroups.blocks.RpcDispatcher
65    implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
66       HAPartition
67 {
68    private static class NoHandlerForRPC implements Serializable JavaDoc
69    {
70       static final long serialVersionUID = -1263095408483622838L;
71    }
72
73    // Constants -----------------------------------------------------
74

75    // final MethodLookup method_lookup_clos = new MethodLookupClos();
76

77    // Attributes ----------------------------------------------------
78

79    protected HashMap JavaDoc rpcHandlers = new HashMap JavaDoc();
80    protected HashMap JavaDoc stateHandlers = new HashMap JavaDoc();
81    /** The HAMembershipListener and HAMembershipExtendedListeners */
82    protected ArrayList JavaDoc listeners = new ArrayList JavaDoc();
83    /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
84    protected ArrayList JavaDoc asynchListeners = new ArrayList JavaDoc();
85    /** The LinkedQueue<ViewChangeEvent> of changes to notify asynch listeners of */
86    protected LinkedQueue asynchViewChanges = new LinkedQueue();
87    /** The Thread used to send membership change notifications asynchronously */
88    protected Thread JavaDoc asynchNotifyThread;
89    /** The current cluster partition members */
90    protected Vector JavaDoc members = null;
91    protected Vector JavaDoc jgmembers = null;
92
93    public Vector JavaDoc history = null;
94
95    /** The partition members other than this node */
96    protected Vector JavaDoc otherMembers = null;
97    protected Vector JavaDoc jgotherMembers = null;
98    /** The JChannel name */
99    protected String JavaDoc partitionName;
100    /** the local JG IP Address */
101    protected org.jgroups.stack.IpAddress localJGAddress = null;
102    /** The cluster transport protocol address string */
103    protected String JavaDoc nodeName;
104    /** me as a ClusterNode */
105    protected ClusterNode me = null;
106    /** The timeout for cluster RPC calls */
107    protected long timeout = 60000;
108    /** The JGroups partition channel */
109    protected JChannel channel;
110    /** The cluster replicant manager */
111    protected DistributedReplicantManagerImpl replicantManager;
112    /** The cluster state manager */
113    protected DistributedStateImpl dsManager;
114    /** The cluster instance log category */
115    protected Logger log;
116    protected Logger clusterLifeCycleLog;
117    /** The current cluster view id */
118    protected long currentViewId = -1;
119    /** The JMX MBeanServer to use for registrations */
120    protected MBeanServer JavaDoc server;
121    /** Number of ms to wait for state */
122    protected long state_transfer_timeout=60000;
123
124    // Static --------------------------------------------------------
125

126    /**
127     * Creates an object from a byte buffer
128     */

129    public static Object JavaDoc objectFromByteBuffer (byte[] buffer) throws Exception JavaDoc
130    {
131       if(buffer == null)
132          return null;
133
134       ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(buffer);
135       MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
136       return mvis.readObject();
137    }
138    
139    /**
140     * Serializes an object into a byte buffer.
141     * The object has to implement interface Serializable or Externalizable
142     */

143    public static byte[] objectToByteBuffer (Object JavaDoc obj) throws Exception JavaDoc
144    {
145       ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
146       MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
147       mvos.writeObject(obj);
148       mvos.flush();
149       return baos.toByteArray();
150    }
151
152    public long getStateTransferTimeout() {
153       return state_transfer_timeout;
154    }
155
156    public void setStateTransferTimeout(long state_transfer_timeout) {
157       this.state_transfer_timeout=state_transfer_timeout;
158    }
159
160
161    public long getMethodCallTimeout() {
162       return timeout;
163    }
164
165    public void setMethodCallTimeout(long timeout) {
166       this.timeout=timeout;
167    }
168
169     // Constructors --------------------------------------------------
170

171    public HAPartitionImpl(String JavaDoc partitionName, org.jgroups.JChannel channel, boolean deadlock_detection, MBeanServer JavaDoc server) throws Exception JavaDoc
172    {
173       this(partitionName, channel, deadlock_detection);
174       this.server = server;
175    }
176    
177    public HAPartitionImpl(String JavaDoc partitionName, org.jgroups.JChannel channel, boolean deadlock_detection) throws Exception JavaDoc
178    {
179       super(channel, null, null, new Object JavaDoc(), deadlock_detection); // init RpcDispatcher with a fake target object
180
this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
181       this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
182       this.channel = channel;
183       this.partitionName = partitionName;
184       this.history = new Vector JavaDoc();
185       logHistory ("Partition object created");
186    }
187    
188     // Public --------------------------------------------------------
189

190    public void init() throws Exception JavaDoc
191    {
192       log.info("Initializing");
193       logHistory ("Initializing partition");
194
195       // Subscribe to dHA events comming generated by the org.jgroups. protocol stack
196
//
197
log.debug("setMembershipListener");
198       setMembershipListener(this);
199       log.debug("setMessageListener");
200       setMessageListener(this);
201       
202       // Create the DRM and link it to this HAPartition
203
//
204
log.debug("create replicant manager");
205       this.replicantManager = new DistributedReplicantManagerImpl(this, this.server);
206       log.debug("init replicant manager");
207       this.replicantManager.init();
208       log.debug("bind replicant manager");
209       
210       // Create the DS and link it to this HAPartition
211
//
212
log.debug("create distributed state");
213       this.dsManager = new DistributedStateImpl(this, this.server);
214       log.debug("init distributed state service");
215       this.dsManager.init();
216       log.debug("bind distributed state service");
217
218       
219       // Bind ourself in the public JNDI space
220
//
221
Context JavaDoc ctx = new InitialContext JavaDoc();
222       this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
223       
224       log.debug("done initing.");
225    }
226    
227    public void startPartition() throws Exception JavaDoc
228    {
229       // get current JG group properties
230
//
231
logHistory ("Starting partition");
232       log.debug("get nodeName");
233       this.localJGAddress = (IpAddress)channel.getLocalAddress();
234       this.me = new ClusterNode(this.localJGAddress);
235       this.nodeName = this.me.getName();
236
237       log.debug("Get current members");
238       View view = channel.getView();
239       this.jgmembers = (Vector JavaDoc)view.getMembers().clone();
240       this.members = translateAddresses(this.jgmembers); // TRANSLATE
241
log.info("Number of cluster members: " + members.size());
242       for(int m = 0; m > members.size(); m ++)
243       {
244          Object JavaDoc node = members.get(m);
245          log.debug(node);
246       }
247       // Keep a list of other members only for "exclude-self" RPC calls
248
//
249
this.jgotherMembers = (Vector JavaDoc)view.getMembers().clone();
250       this.jgotherMembers.remove (channel.getLocalAddress());
251       this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
252
log.info ("Other members: " + this.otherMembers.size ());
253
254       verifyNodeIsUnique (view.getMembers());
255
256       // Update the initial view id
257
//
258
this.currentViewId = view.getVid().getId();
259
260       // We must now synchronize new state transfer subscriber
261
//
262
log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
263       boolean rc = channel.getState(null, this.state_transfer_timeout);
264       if (rc)
265          log.debug("State was retrieved successfully");
266       else
267          log.debug("State could not be retrieved, (must be first member of group)");
268       
269       // We start now able to start our DRM and DS
270
//
271
this.replicantManager.start();
272       this.dsManager.start();
273
274       // Create the asynch listener handler thread
275
AsynchViewChangeHandler asynchHandler = new AsynchViewChangeHandler();
276       asynchNotifyThread = new Thread JavaDoc(asynchHandler, "AsynchHAMembershipListener Thread");
277       asynchNotifyThread.start();
278    }
279
280    public void closePartition() throws Exception JavaDoc
281    {
282       logHistory ("Closing partition");
283       log.info("Closing partition " + partitionName);
284
285       try
286       {
287          asynchNotifyThread.interrupt();
288       }
289       catch( Exception JavaDoc e)
290       {
291          log.warn("Failed to interrupte asynchNotifyThread", e);
292       }
293
294       // Stop the DRM and DS services
295
//
296
try
297       {
298          this.replicantManager.stop();
299       }
300       catch (Exception JavaDoc e)
301       {
302          log.error("operation failed", e);
303       }
304
305       try
306       {
307          this.dsManager.stop();
308       }
309       catch (Exception JavaDoc e)
310       {
311          log.error("operation failed", e);
312       }
313
314 // NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
315
// add the destroyPartition() step
316
try
317       {
318 // channel.close();
319
channel.disconnect();
320       }
321       catch (Exception JavaDoc e)
322       {
323          log.error("operation failed", e);
324       }
325
326 // String boundName = "/HAPartition/" + partitionName;
327
//
328
// InitialContext ctx = new InitialContext();
329
// try
330
// {
331
//
332
// ctx.unbind(boundName);
333
// }
334
// finally
335
// {
336
// ctx.close();
337
// }
338
// NonSerializableFactory.unbind (boundName);
339

340       log.info("Partition " + partitionName + " closed.");
341    }
342
343 // NR 200505 : [JBCLUSTER-38] destroy partition close the channel
344
public void destroyPartition() throws Exception JavaDoc
345    {
346
347       try
348       {
349          this.replicantManager.destroy();
350       }
351       catch (Exception JavaDoc e)
352       {
353          log.error("operation failed", e);
354       }
355
356       try
357       {
358          this.dsManager.destroy();
359       }
360       catch (Exception JavaDoc e)
361       {
362          log.error("operation failed", e);
363       }
364       try
365       {
366          channel.close();
367       }
368       catch (Exception JavaDoc e)
369       {
370          log.error("operation failed", e);
371       }
372       
373        String JavaDoc boundName = "/HAPartition/" + partitionName;
374
375       InitialContext JavaDoc ctx = new InitialContext JavaDoc();
376       try
377       {
378          
379          ctx.unbind(boundName);
380       }
381       finally
382       {
383          ctx.close();
384       }
385       NonSerializableFactory.unbind (boundName);
386
387       log.info("Partition " + partitionName + " destroyed.");
388   }
389
390    // org.jgroups.MessageListener implementation ----------------------------------------------
391

392    // MessageListener methods
393
//
394
public byte[] getState()
395    {
396       logHistory ("getState called on partition");
397       boolean debug = log.isDebugEnabled();
398       
399       log.debug("getState called.");
400       try
401       {
402          // we now get the sub-state of each HAPartitionStateTransfer subscribers and
403
// build a "macro" state
404
//
405
HashMap JavaDoc state = new HashMap JavaDoc();
406          Iterator JavaDoc keys = stateHandlers.keySet().iterator();
407          while (keys.hasNext())
408          {
409             String JavaDoc key = (String JavaDoc)keys.next();
410             HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
411             if (debug)
412                log.debug("getState for " + key);
413             state.put(key, subscriber.getCurrentState());
414          }
415          return objectToByteBuffer(state);
416       }
417       catch (Exception JavaDoc ex)
418       {
419          log.error("getState failed", ex);
420       }
421       return null;
422    }
423    
424    public void setState(byte[] obj)
425    {
426       logHistory ("setState called on partition");
427       try
428       {
429          log.debug("setState called");
430          if (obj == null)
431          {
432             log.debug("state is null");
433             return;
434          }
435          
436          long used_mem_before, used_mem_after;
437          int state_size=obj != null? obj.length : 0;
438          Runtime JavaDoc rt=Runtime.getRuntime();
439          used_mem_before=rt.totalMemory() - rt.freeMemory();
440
441          HashMap JavaDoc state = (HashMap JavaDoc)objectFromByteBuffer(obj);
442          java.util.Iterator JavaDoc keys = state.keySet().iterator();
443          while (keys.hasNext())
444          {
445             String JavaDoc key = (String JavaDoc)keys.next();
446             log.debug("setState for " + key);
447             Object JavaDoc someState = state.get(key);
448             HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
449             if (subscriber != null)
450             {
451                subscriber.setCurrentState((java.io.Serializable JavaDoc)someState);
452             }
453             else
454             {
455                log.debug("There is no stateHandler for: " + key);
456             }
457          }
458
459          used_mem_after=rt.totalMemory() - rt.freeMemory();
460          log.debug("received a state of " + state_size + " bytes; expanded memory by " +
461                (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
462                ", used memory after: " + used_mem_after + ")");
463       }
464       catch (Exception JavaDoc ex)
465       {
466          log.error("setState failed", ex);
467       }
468    }
469    
470    public void receive(org.jgroups.Message msg)
471    { /* complete */}
472    
473    // org.jgroups.MembershipListener implementation ----------------------------------------------
474

475    public void suspect(org.jgroups.Address suspected_mbr)
476    {
477       logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
478       if (isCurrentNodeCoordinator ())
479          clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
480       else
481          log.info("Suspected member: " + suspected_mbr);
482    }
483
484    public void block() {}
485    
486    /** Notification of a cluster view change. This is done from the JG protocol
487     * handlder thread and we must be careful to not unduly block this thread.
488     * Because of this there are two types of listeners, synchronous and
489     * asynchronous. The synchronous listeners are messaged with the view change
490     * event using the calling thread while the asynchronous listeners are
491     * messaged using a seperate thread.
492     *
493     * @param newView
494     */

495    public void viewAccepted(View newView)
496    {
497       try
498       {
499          // we update the view id
500
//
501
this.currentViewId = newView.getVid().getId();
502
503          // Keep a list of other members only for "exclude-self" RPC calls
504
//
505
this.jgotherMembers = (Vector JavaDoc)newView.getMembers().clone();
506          this.jgotherMembers.remove (channel.getLocalAddress());
507          this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
508
Vector JavaDoc translatedNewView = translateAddresses ((Vector JavaDoc)newView.getMembers().clone());
509          logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
510                      " (old view: " + this.members + " )");
511
512
513          // Save the previous view and make a copy of the new view
514
Vector JavaDoc oldMembers = this.members;
515
516          Vector JavaDoc newjgMembers = (Vector JavaDoc)newView.getMembers().clone();
517          Vector JavaDoc newMembers = translateAddresses(newjgMembers); // TRANSLATE
518
if (this.members == null)
519          {
520             // Initial viewAccepted
521
this.members = newMembers;
522             this.jgmembers = newjgMembers;
523             log.debug("ViewAccepted: initial members set");
524             return;
525          }
526          this.members = newMembers;
527          this.jgmembers = newjgMembers;
528
529          int difference = 0;
530          if (oldMembers == null)
531             difference = newMembers.size () - 1;
532          else
533             difference = newMembers.size () - oldMembers.size ();
534          
535          if (isCurrentNodeCoordinator ())
536             clusterLifeCycleLog.info ("New cluster view for partition " + this.partitionName + " (id: " +
537                                       this.currentViewId + ", delta: " + difference + ") : " + this.members);
538          else
539             log.info("New cluster view for partition " + this.partitionName + ": " +
540                      this.currentViewId + " (" + this.members + " delta: " + difference + ")");
541
542          // Build a ViewChangeEvent for the asynch listeners
543
ViewChangeEvent event = new ViewChangeEvent();
544          event.viewId = currentViewId;
545          event.allMembers = translatedNewView;
546          event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
547          event.newMembers = getNewMembers(oldMembers, event.allMembers);
548          event.originatingGroups = null;
549          // if the new view occurs because of a merge, we first inform listeners of the merge
550
if(newView instanceof MergeView)
551          {
552             MergeView mergeView = (MergeView) newView;
553             event.originatingGroups = mergeView.getSubgroups();
554          }
555
556          log.debug("membership changed from " + this.members.size() + " to "
557             + event.allMembers.size());
558          // Put the view change to the asynch queue
559
this.asynchViewChanges.put(event);
560
561          // Broadcast the new view to the synchronous view change listeners
562
this.notifyListeners(listeners, event.viewId, event.allMembers,
563             event.deadMembers, event.newMembers, event.originatingGroups);
564       }
565       catch (Exception JavaDoc ex)
566       {
567          log.error("ViewAccepted failed", ex);
568       }
569    }
570
571    // HAPartition implementation ----------------------------------------------
572

573    public String JavaDoc getNodeName()
574    {
575       return nodeName;
576    }
577    
578    public String JavaDoc getPartitionName()
579    {
580       return partitionName;
581    }
582    
583    public DistributedReplicantManager getDistributedReplicantManager()
584    {
585       return replicantManager;
586    }
587    
588    public DistributedState getDistributedStateService()
589    {
590       return this.dsManager;
591    }
592
593    public long getCurrentViewId()
594    {
595       return this.currentViewId;
596    }
597    
598    public Vector JavaDoc getCurrentView()
599    {
600       Vector JavaDoc result = new Vector JavaDoc (this.members.size());
601       for (int i = 0; i < members.size(); i++)
602       {
603          result.add( ((ClusterNode) members.elementAt(i)).getName() );
604       }
605       return result;
606    }
607
608    public ClusterNode[] getClusterNodes ()
609    {
610       ClusterNode[] nodes = new ClusterNode[this.members.size()];
611       this.members.toArray(nodes);
612       return nodes;
613    }
614
615    public ClusterNode getClusterNode ()
616    {
617       return me;
618    }
619
620    public boolean isCurrentNodeCoordinator ()
621    {
622       if(this.members == null || this.members.size() == 0 || this.me == null)
623          return false;
624      return this.members.elementAt (0).equals (this.me);
625    }
626
627    // ***************************
628
// ***************************
629
// RPC multicast communication
630
// ***************************
631
// ***************************
632
//
633
public void registerRPCHandler(String JavaDoc objName, Object JavaDoc subscriber)
634    {
635       rpcHandlers.put(objName, subscriber);
636    }
637    
638    public void unregisterRPCHandler(String JavaDoc objName, Object JavaDoc subscriber)
639    {
640       rpcHandlers.remove(objName);
641    }
642       
643
644    /**
645     *
646     * @param objName
647     * @param methodName
648     * @param args
649     * @param excludeSelf
650     * @return
651     * @throws Exception
652     * @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
653     */

654    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
655       Object JavaDoc[] args, boolean excludeSelf) throws Exception JavaDoc
656    {
657       return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
658    }
659
660    /**
661     * This function is an abstraction of RpcDispatcher.
662     */

663    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
664       Object JavaDoc[] args, Class JavaDoc[] types, boolean excludeSelf) throws Exception JavaDoc
665    {
666       return callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.timeout);
667    }
668
669
670    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
671        Object JavaDoc[] args, Class JavaDoc[] types, boolean excludeSelf, long methodTimeout) throws Exception JavaDoc
672    {
673       ArrayList JavaDoc rtn = new ArrayList JavaDoc();
674       MethodCall m=null;
675       RspList rsp = null;
676       boolean trace = log.isTraceEnabled();
677
678       if(types != null)
679          m=new MethodCall(objName + "." + methodName, args, types);
680       else
681          m=new MethodCall(objName + "." + methodName, args);
682
683       if (excludeSelf)
684       {
685          if( trace )
686          {
687             log.trace("callMethodOnCluster(true), objName="+objName
688                +", methodName="+methodName+", members="+jgotherMembers);
689          }
690          rsp = this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
691       }
692       else
693       {
694          if( trace )
695          {
696             log.trace("callMethodOnCluster(false), objName="+objName
697                +", methodName="+methodName+", members="+members);
698          }
699          rsp = this.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
700       }
701
702       if (rsp != null)
703       {
704          for (int i = 0; i < rsp.size(); i++)
705          {
706             Object JavaDoc item = rsp.elementAt(i);
707             if (item instanceof Rsp)
708             {
709                Rsp response = (Rsp) item;
710                // Only include received responses
711
boolean wasReceived = response.wasReceived();
712                if( wasReceived == true )
713                {
714                   item = response.getValue();
715                   if (!(item instanceof NoHandlerForRPC))
716                      rtn.add(item);
717                }
718                else if( trace )
719                   log.trace("Ignoring non-received response: "+response);
720             }
721             else
722             {
723                if (!(item instanceof NoHandlerForRPC))
724                   rtn.add(item);
725                else if( trace )
726                   log.trace("Ignoring NoHandlerForRPC");
727             }
728          }
729       }
730
731       return rtn;
732     }
733
734    /**
735     * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
736     * cluster.
737     * and is replaced
738     * @param objName
739     * @param methodName
740     * @param args
741     * @param types
742     * @param excludeSelf
743     * @return
744     * @throws Exception
745     */

746    public ArrayList JavaDoc callMethodOnCoordinatorNode(String JavaDoc objName, String JavaDoc methodName,
747           Object JavaDoc[] args, Class JavaDoc[] types,boolean excludeSelf) throws Exception JavaDoc
748       {
749       return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf,this.timeout);
750       }
751
752    /**
753     * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
754     * cluster.
755     * and is replaced
756     * @param objName
757     * @param methodName
758     * @param args
759     * @param types
760     * @param excludeSelf
761     * @param methodTimeout
762     * @return
763     * @throws Exception
764     */

765    public ArrayList JavaDoc callMethodOnCoordinatorNode(String JavaDoc objName, String JavaDoc methodName,
766           Object JavaDoc[] args, Class JavaDoc[] types,boolean excludeSelf, long methodTimeout) throws Exception JavaDoc
767       {
768          ArrayList JavaDoc rtn = new ArrayList JavaDoc();
769          MethodCall m=null;
770          RspList rsp = null;
771          boolean trace = log.isTraceEnabled();
772
773          if(types != null)
774             m=new MethodCall(objName + "." + methodName, args, types);
775          else
776             m=new MethodCall(objName + "." + methodName, args);
777
778          if( trace )
779          {
780             log.trace("callMethodOnCoordinatorNode(false), objName="+objName
781                +", methodName="+methodName);
782          }
783
784          // the first cluster view member is the coordinator
785
Vector JavaDoc coordinatorOnly = new Vector JavaDoc();
786          // If we are the coordinator, only call ourself if 'excludeSelf' is false
787
if (false == isCurrentNodeCoordinator () ||
788              false == excludeSelf)
789             coordinatorOnly.addElement(this.jgmembers.elementAt (0));
790
791          rsp = this.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
792
793          if (rsp != null)
794          {
795             for (int i = 0; i < rsp.size(); i++)
796             {
797                Object JavaDoc item = rsp.elementAt(i);
798                if (item instanceof Rsp)
799                {
800                   Rsp response = (Rsp) item;
801                   // Only include received responses
802
boolean wasReceived = response.wasReceived();
803                   if( wasReceived == true )
804                   {
805                      item = response.getValue();
806                      if (!(item instanceof NoHandlerForRPC))
807                         rtn.add(item);
808                   }
809                   else if( trace )
810                      log.trace("Ignoring non-received response: "+response);
811                }
812                else
813                {
814                   if (!(item instanceof NoHandlerForRPC))
815                      rtn.add(item);
816                   else if( trace )
817                      log.trace("Ignoring NoHandlerForRPC");
818                }
819             }
820          }
821
822          return rtn;
823        }
824
825
826    /**
827     *
828     * @param objName
829     * @param methodName
830     * @param args
831     * @param excludeSelf
832     * @throws Exception
833     * @deprecated Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead
834     */

835    public void callAsynchMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
836       Object JavaDoc[] args, boolean excludeSelf) throws Exception JavaDoc {
837       callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
838    }
839
840
841
842
843
844    /**
845     * This function is an abstraction of RpcDispatcher for asynchronous messages
846     */

847    public void callAsynchMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
848       Object JavaDoc[] args, Class JavaDoc[] types, boolean excludeSelf) throws Exception JavaDoc
849    {
850       MethodCall m = null;
851       boolean trace = log.isTraceEnabled();
852
853       if(types != null)
854          m=new MethodCall(objName + "." + methodName, args, types);
855       else
856          m=new MethodCall(objName + "." + methodName, args);
857
858       if (excludeSelf)
859       {
860          if( trace )
861          {
862             log.trace("callAsynchMethodOnCluster(true), objName="+objName
863                +", methodName="+methodName+", members="+jgotherMembers);
864          }
865          this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, timeout);
866       }
867       else
868       {
869          if( trace )
870          {
871             log.trace("callAsynchMethodOnCluster(false), objName="+objName
872                +", methodName="+methodName+", members="+members);
873          }
874          this.callRemoteMethods(null, m, GroupRequest.GET_NONE, timeout);
875       }
876    }
877    
878    // *************************
879
// *************************
880
// State transfer management
881
// *************************
882
// *************************
883
//
884
public void subscribeToStateTransferEvents(String JavaDoc objectName, HAPartitionStateTransfer subscriber)
885    {
886       stateHandlers.put(objectName, subscriber);
887    }
888    
889    public void unsubscribeFromStateTransferEvents(String JavaDoc objectName, HAPartitionStateTransfer subscriber)
890    {
891       stateHandlers.remove(objectName);
892    }
893    
894    // *************************
895
// *************************
896
// Group Membership listeners
897
// *************************
898
// *************************
899
//
900
public void registerMembershipListener(HAMembershipListener listener)
901    {
902       synchronized(this.listeners)
903       {
904          this.listeners.add(listener);
905       }
906    }
907    
908    public void unregisterMembershipListener(HAMembershipListener listener)
909    {
910       synchronized(this.listeners)
911       {
912          this.listeners.remove(listener);
913       }
914    }
915    
916    // org.jgroups.RpcDispatcher overrides ---------------------------------------------------
917

918    /**
919     * Message contains MethodCall. Execute it against *this* object and return result.
920     * Use MethodCall.Invoke() to do this. Return result.
921     *
922     * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
923     * @param req The org.jgroups. representation of the method invocation
924     * @return The serializable return value from the invocation
925     */

926    public Object JavaDoc handle(Message req)
927    {
928       Object JavaDoc body = null;
929       Object JavaDoc retval = null;
930       MethodCall method_call = null;
931       boolean trace = log.isTraceEnabled();
932       
933       if( trace )
934          log.trace("Partition " + partitionName + " received msg");
935       if(req == null || req.getBuffer() == null)
936       {
937          log.warn("RpcProtocol.Handle(): message or message buffer is null !");
938          return null;
939       }
940       
941       try
942       {
943          body = Util.objectFromByteBuffer(req.getBuffer());
944       }
945       catch(Exception JavaDoc e)
946       {
947          log.warn("RpcProtocol.Handle(): " + e);
948          return null;
949       }
950       
951       if(body == null || !(body instanceof MethodCall))
952       {
953          log.warn("RpcProtocol.Handle(): message does not contain a MethodCall object !");
954          return null;
955       }
956       
957       // get method call informations
958
//
959
method_call = (MethodCall)body;
960       String JavaDoc methodName = method_call.getName();
961       
962       if( trace )
963          log.trace("pre methodName: " + methodName);
964       
965       int idx = methodName.lastIndexOf('.');
966       String JavaDoc handlerName = methodName.substring(0, idx);
967       String JavaDoc newMethodName = methodName.substring(idx + 1);
968       
969       if( trace )
970       {
971          log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
972          log.trace("Handle: " + methodName);
973       }
974       
975       // prepare method call
976
method_call.setName(newMethodName);
977       Object JavaDoc handler = rpcHandlers.get(handlerName);
978       if (handler == null)
979       {
980          if( trace )
981             log.debug("No rpc handler registered under: "+handlerName);
982          return new NoHandlerForRPC();
983       }
984
985       /* Invoke it and just return any exception with trace level logging of
986       the exception. The exception semantics of a group rpc call are weak as
987       the return value may be a normal return value or the exception thrown.
988       */

989       try
990       {
991          retval = method_call.invoke(handler);
992          if( trace )
993             log.trace("rpc call return value: "+retval);
994       }
995       catch (Throwable JavaDoc t)
996       {
997          if( trace )
998             log.trace("rpc call threw exception", t);
999          retval = t;
1000      }
1001
1002      return retval;
1003   }
1004
1005   // Package protected ---------------------------------------------
1006

1007   // Protected -----------------------------------------------------
1008

1009   protected void verifyNodeIsUnique (Vector JavaDoc javaGroupIpAddresses) throws Exception JavaDoc
1010   {
1011      byte[] localUniqueName = this.localJGAddress.getAdditionalData();
1012      if (localUniqueName == null)
1013         log.warn("No additional information has been found in the JavaGroup address: " +
1014                  "make sure you are running with a correct version of JGroups and that the protocol " +
1015                  " you are using supports the 'additionalData' behaviour");
1016
1017      for (int i = 0; i < javaGroupIpAddresses.size(); i++)
1018      {
1019         IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
1020         if (!address.equals(this.localJGAddress))
1021         {
1022            if (localUniqueName.equals(address.getAdditionalData()))
1023               throw new Exception JavaDoc ("Local node removed from cluster (" + this.localJGAddress + "): another node (" + address + ") publicizing the same name was already there");
1024         }
1025      }
1026   }
1027
1028   /**
1029    * Helper method that binds the partition in the JNDI tree.
1030    * @param jndiName Name under which the object must be bound
1031    * @param who Object to bind in JNDI
1032    * @param classType Class type under which should appear the bound object
1033    * @param ctx Naming context under which we bind the object
1034    * @throws Exception Thrown if a naming exception occurs during binding
1035    */

1036   protected void bind(String JavaDoc jndiName, Object JavaDoc who, Class JavaDoc classType, Context JavaDoc ctx) throws Exception JavaDoc
1037   {
1038      // Ah ! This service isn't serializable, so we use a helper class
1039
//
1040
NonSerializableFactory.bind(jndiName, who);
1041      Name JavaDoc n = ctx.getNameParser("").parse(jndiName);
1042      while (n.size () > 1)
1043      {
1044         String JavaDoc ctxName = n.get (0);
1045         try
1046         {
1047            ctx = (Context JavaDoc)ctx.lookup (ctxName);
1048         }
1049         catch (NameNotFoundException JavaDoc e)
1050         {
1051            log.debug ("creating Subcontext" + ctxName);
1052            ctx = ctx.createSubcontext (ctxName);
1053         }
1054         n = n.getSuffix (1);
1055      }
1056
1057      // The helper class NonSerializableFactory uses address type nns, we go on to
1058
// use the helper class to bind the service object in JNDI
1059
//
1060
StringRefAddr JavaDoc addr = new StringRefAddr JavaDoc("nns", jndiName);
1061      Reference JavaDoc ref = new Reference JavaDoc(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
1062      ctx.rebind (n.get (0), ref);
1063   }
1064   
1065   /**
1066    * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
1067    * Dead members are old - new members.
1068    * @param oldMembers Vector of old members
1069    * @param newMembers Vector of new members
1070    * @return Vector of members that have died between the two views, can be empty.
1071    */

1072   protected Vector JavaDoc getDeadMembers(Vector JavaDoc oldMembers, Vector JavaDoc newMembers)
1073   {
1074      boolean debug = log.isDebugEnabled();
1075      if(oldMembers == null) oldMembers=new Vector JavaDoc();
1076      if(newMembers == null) newMembers=new Vector JavaDoc();
1077      Vector JavaDoc dead=(Vector JavaDoc)oldMembers.clone();
1078      dead.removeAll(newMembers);
1079      if(dead.size() > 0 && debug)
1080         log.debug("dead members: " + dead);
1081      return dead;
1082   }
1083   
1084   /**
1085    * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
1086    * @param oldMembers Vector of old members
1087    * @param allMembers Vector of new members
1088    * @return Vector of members that have joined the partition between the two views
1089    */

1090   protected Vector JavaDoc getNewMembers(Vector JavaDoc oldMembers, Vector JavaDoc allMembers)
1091   {
1092      if(oldMembers == null) oldMembers=new Vector JavaDoc();
1093      if(allMembers == null) allMembers=new Vector JavaDoc();
1094      Vector JavaDoc newMembers=(Vector JavaDoc)allMembers.clone();
1095      newMembers.removeAll(oldMembers);
1096      return newMembers;
1097   }
1098
1099   protected void notifyListeners(ArrayList JavaDoc theListeners, long viewID,
1100      Vector JavaDoc allMembers, Vector JavaDoc deadMembers, Vector JavaDoc newMembers,
1101      Vector JavaDoc originatingGroups)
1102   {
1103      log.debug("Begin notifyListeners, viewID: "+viewID);
1104      synchronized(theListeners)
1105      {
1106         for (int i = 0; i < theListeners.size(); i++)
1107         {
1108            HAMembershipListener aListener = null;
1109            try
1110            {
1111               aListener = (HAMembershipListener) theListeners.get(i);
1112               if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
1113               {
1114                  HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
1115                  exListener.membershipChangedDuringMerge (deadMembers, newMembers,
1116                     allMembers, originatingGroups);
1117               }
1118               else
1119               {
1120                  aListener.membershipChanged(deadMembers, newMembers, allMembers);
1121               }
1122            }
1123            catch (Throwable JavaDoc e)
1124            {
1125               // a problem in a listener should not prevent other members to receive the new view
1126
log.warn("HAMembershipListener callback failure: "+aListener, e);
1127            }
1128         }
1129      }
1130      log.debug("End notifyListeners, viewID: "+viewID);
1131   }
1132
1133   protected Vector JavaDoc translateAddresses (Vector JavaDoc jgAddresses)
1134   {
1135      if (jgAddresses == null)
1136         return null;
1137
1138      Vector JavaDoc result = new Vector JavaDoc (jgAddresses.size());
1139      for (int i = 0; i < jgAddresses.size(); i++)
1140      {
1141         IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
1142         result.add(new ClusterNode (addr));
1143      }
1144
1145      return result;
1146   }
1147
1148   public void logHistory (String JavaDoc message)
1149   {
1150      try
1151      {
1152         history.add(new SimpleDateFormat JavaDoc().format (new Date JavaDoc()) + " : " + message);
1153      }
1154      catch (Exception JavaDoc ignored){}
1155   }
1156
1157   /** A simply data class containing the view change event needed to
1158    * message the HAMembershipListeners
1159    */

1160   private static class ViewChangeEvent
1161   {
1162      long viewId;
1163      Vector JavaDoc deadMembers;
1164      Vector JavaDoc newMembers;
1165      Vector JavaDoc allMembers;
1166      Vector JavaDoc originatingGroups;
1167   }
1168
1169   /** The Runnable that handles the asynchronous listener notifications
1170    */

1171   private class AsynchViewChangeHandler implements Runnable JavaDoc
1172   {
1173      public void run()
1174      {
1175         log.debug("Begin AsynchViewChangeHandler");
1176         while( true )
1177         {
1178            try
1179            {
1180               ViewChangeEvent event = (ViewChangeEvent) asynchViewChanges.take();
1181               notifyListeners(asynchListeners, event.viewId, event.allMembers,
1182                  event.deadMembers, event.newMembers, event.originatingGroups);
1183            }
1184            catch(InterruptedException JavaDoc e)
1185            {
1186               log.debug("AsynchViewChangeHandler interrupted", e);
1187               break;
1188            }
1189         }
1190         log.debug("End AsynchViewChangeHandler");
1191      }
1192   }
1193
1194   // Private -------------------------------------------------------
1195

1196   // Inner classes -------------------------------------------------
1197

1198}
1199
Popular Tags