KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > ha > framework > server > ClusterPartition


1 /*
2   * JBoss, Home of Professional Open Source
3   * Copyright 2005, JBoss Inc., and individual contributors as indicated
4   * by the @authors tag. See the copyright.txt in the distribution for a
5   * full listing of individual contributors.
6   *
7   * This is free software; you can redistribute it and/or modify it
8   * under the terms of the GNU Lesser General Public License as
9   * published by the Free Software Foundation; either version 2.1 of
10   * the License, or (at your option) any later version.
11   *
12   * This software is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this software; if not, write to the Free
19   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21   */

22 package org.jboss.ha.framework.server;
23
24 import java.io.ByteArrayInputStream JavaDoc;
25 import java.io.ByteArrayOutputStream JavaDoc;
26 import java.io.IOException JavaDoc;
27 import java.io.InputStream JavaDoc;
28 import java.io.OutputStream JavaDoc;
29 import java.io.Serializable JavaDoc;
30 import java.net.InetAddress JavaDoc;
31 import java.rmi.dgc.VMID JavaDoc;
32 import java.rmi.server.UID JavaDoc;
33 import java.text.SimpleDateFormat JavaDoc;
34 import java.util.ArrayList JavaDoc;
35 import java.util.Date JavaDoc;
36 import java.util.HashMap JavaDoc;
37 import java.util.Iterator JavaDoc;
38 import java.util.Map JavaDoc;
39 import java.util.Vector JavaDoc;
40
41 import javax.naming.Context JavaDoc;
42 import javax.naming.InitialContext JavaDoc;
43 import javax.naming.Name JavaDoc;
44 import javax.naming.NameNotFoundException JavaDoc;
45 import javax.naming.Reference JavaDoc;
46 import javax.naming.StringRefAddr JavaDoc;
47
48 import org.jboss.cache.Cache;
49 import org.jboss.ha.framework.interfaces.ClusterNode;
50 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
51 import org.jboss.ha.framework.interfaces.DistributedState;
52 import org.jboss.ha.framework.interfaces.HAPartition;
53 import org.jboss.invocation.MarshalledValueInputStream;
54 import org.jboss.invocation.MarshalledValueOutputStream;
55 import org.jboss.logging.Logger;
56 import org.jboss.naming.NonSerializableFactory;
57 import org.jboss.system.ServiceMBeanSupport;
58 import org.jboss.system.server.ServerConfigUtil;
59 import org.jgroups.Channel;
60 import org.jgroups.Event;
61 import org.jgroups.ExtendedMessageListener;
62 import org.jgroups.JChannel;
63 import org.jgroups.MembershipListener;
64 import org.jgroups.MergeView;
65 import org.jgroups.Message;
66 import org.jgroups.MessageListener;
67 import org.jgroups.Version;
68 import org.jgroups.View;
69 import org.jgroups.blocks.GroupRequest;
70 import org.jgroups.blocks.MethodCall;
71 import org.jgroups.blocks.RequestHandler;
72 import org.jgroups.blocks.RpcDispatcher;
73 import org.jgroups.debug.Debugger;
74 import org.jgroups.jmx.JChannelFactoryMBean;
75 import org.jgroups.stack.IpAddress;
76 import org.jgroups.util.Rsp;
77 import org.jgroups.util.RspList;
78
79 /**
80  * {@link HAPartition} implementation based on a
81  * <a HREF="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code>
82  * and a multiplexed <code>JChannel</code>.
83  *
84  * @author <a HREF="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
85  * @author <a HREF="mailto:bill@burkecentral.com">Bill Burke</a>.
86  * @author Scott.Stark@jboss.org
87  * @author brian.stansberry@jboss.com
88  * @version $Revision: 58577 $
89  */

90 public class ClusterPartition
91    extends ServiceMBeanSupport
92    implements MembershipListener, HAPartition,
93               AsynchEventHandler.AsynchEventProcessor,
94               ClusterPartitionMBean
95 {
96    private static final byte NULL_VALUE = 0;
97    private static final byte SERIALIZABLE_VALUE = 1;
98    // TODO add Streamable support
99
// private static final byte STREAMABLE_VALUE = 2;
100

101    /**
102     * Returned when an RPC call arrives for a service that isn't registered.
103     */

104    private static class NoHandlerForRPC implements Serializable JavaDoc
105    {
106       static final long serialVersionUID = -1263095408483622838L;
107    }
108    
109    private static class StateStreamEnd implements Serializable JavaDoc
110    {
111       /** The serialVersionUID */
112       private static final long serialVersionUID = -3705345735451504946L;
113    }
114
115    // Constants -----------------------------------------------------
116

117    // final MethodLookup method_lookup_clos = new MethodLookupClos();
118

119    // Attributes ----------------------------------------------------
120

121    protected ClusterPartitionConfig config;
122    protected HashMap JavaDoc rpcHandlers = new HashMap JavaDoc();
123    protected HashMap JavaDoc stateHandlers = new HashMap JavaDoc();
124    /** Do we send any membership change notifications synchronously? */
125    protected boolean allowSyncListeners = false;
126    /** The HAMembershipListener and HAMembershipExtendedListeners */
127    protected ArrayList JavaDoc synchListeners = new ArrayList JavaDoc();
128    /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
129    protected ArrayList JavaDoc asynchListeners = new ArrayList JavaDoc();
130    /** The handler used to send membership change notifications asynchronously */
131    protected AsynchEventHandler asynchHandler;
132    /** The current cluster partition members */
133    protected Vector JavaDoc members = null;
134    protected Vector JavaDoc jgmembers = null;
135
136    public Vector JavaDoc history = null;
137
138    /** The partition members other than this node */
139    protected Vector JavaDoc otherMembers = null;
140    protected Vector JavaDoc jgotherMembers = null;
141    /** the local JG IP Address */
142    protected org.jgroups.stack.IpAddress localJGAddress = null;
143    /** The cluster transport protocol address string */
144    protected String JavaDoc nodeName;
145    /** me as a ClusterNode */
146    protected ClusterNode me = null;
147    /** The JGroups partition channel */
148    protected JChannel channel;
149    /** The cluster replicant manager */
150    protected DistributedReplicantManager replicantManager;
151    /** The cluster instance log category */
152    protected Logger log;
153    protected Logger clusterLifeCycleLog;
154    /** The current cluster view id */
155    protected long currentViewId = -1;
156    
157    private RpcDispatcher dispatcher = null;
158
159    /**
160     * True if serviceState was initialized during start-up.
161     */

162    protected boolean isStateSet = false;
163
164    /**
165     * An exception occuring upon fetch serviceState.
166     */

167    protected Exception JavaDoc setStateException;
168    private final Object JavaDoc stateLock = new Object JavaDoc();
169    private final MessageListenerAdapter messageListener = new MessageListenerAdapter();
170    private Debugger debugger;
171    private boolean selfCreatedDRM;
172
173    // Static --------------------------------------------------------
174

175    /**
176     * Creates an object from a byte buffer
177     */

178    public static Object JavaDoc objectFromByteBuffer (byte[] buffer) throws Exception JavaDoc
179    {
180       if(buffer == null)
181          return null;
182
183       ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(buffer);
184       MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
185       return mvis.readObject();
186    }
187    
188    /**
189     * Serializes an object into a byte buffer.
190     * The object has to implement interface Serializable or Externalizable
191     */

192    public static byte[] objectToByteBuffer (Object JavaDoc obj) throws Exception JavaDoc
193    {
194       ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
195       MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
196       mvos.writeObject(obj);
197       mvos.flush();
198       return baos.toByteArray();
199    }
200
201    private static JChannel createMuxChannel(ClusterPartitionConfig config)
202    {
203       JChannelFactoryMBean factory = config.getMultiplexer();
204       if (factory == null)
205          throw new IllegalStateException JavaDoc("HAPartitionConfig has no JChannelFactory");
206       String JavaDoc stack = config.getMultiplexerStack();
207       if (stack == null)
208          throw new IllegalStateException JavaDoc("HAPartitionConfig has no multiplexer stack");
209       try
210       {
211          return (JChannel) factory.createMultiplexerChannel(stack, config.getMultiplexerStack());
212       }
213       catch (RuntimeException JavaDoc e)
214       {
215          throw e;
216       }
217       catch (Exception JavaDoc e)
218       {
219          throw new RuntimeException JavaDoc("Failure creatig multiplexed Channel", e);
220       }
221    }
222
223     // Constructors --------------------------------------------------
224

225    public ClusterPartition(ClusterPartitionConfig config)
226    {
227       if (config == null)
228          throw new IllegalArgumentException JavaDoc("config cannot be null");
229       
230       this.config = config;
231       setupLoggers(config.getPartitionName());
232       this.history = new Vector JavaDoc();
233       logHistory ("Partition object created");
234    }
235
236    // ------------------------------------------------------------ ServiceMBean
237

238    // ----------------------------------------------------------------- Service
239

240    protected void createService() throws Exception JavaDoc
241    {
242       if (config == null)
243          throw new IllegalArgumentException JavaDoc("config cannot be null");
244       log.debug("Creating Multiplexer Channel for partition " + getPartitionName() +
245             " using stack " + getMultiplexerStack());
246       
247       channel = createMuxChannel(config);
248       
249       channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
250       channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
251       
252       log.info("Initializing partition " + getPartitionName());
253       logHistory ("Initializing partition " + getPartitionName());
254       
255       dispatcher = new RpcHandler(channel, null, null, new Object JavaDoc(), config.getDeadlockDetection());
256       
257       // Subscribe to events generated by the org.jgroups. protocol stack
258
log.debug("setMembershipListener");
259       dispatcher.setMembershipListener(this);
260       log.debug("setMessageListener");
261       dispatcher.setMessageListener(messageListener);
262       dispatcher.setMarshaller(new MarshallerImpl());
263       
264       // FIXME remove once @JMX issues are sorted
265
if (replicantManager == null)
266       {
267          // Create the DRM and link it to this HAPartition
268
log.debug("create replicant manager");
269          DistributedReplicantManagerImpl drm = new DistributedReplicantManagerImpl(this);
270          if (server != null)
271             drm.registerWithJmx(server);
272          log.debug("create replicant manager");
273          drm.create();
274          setDistributedReplicantManager(drm);
275          this.selfCreatedDRM = true;
276       }
277       
278 // // Create the DS and link it to this HAPartition
279
// log.debug("create distributed serviceState service");
280
// this.dsManager = new DistributedStateImpl(this, this.server);
281
// log.debug("init distributed serviceState service");
282
// this.dsManager.init();
283

284       // Create the asynchronous handler for view changes
285
asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
286       
287       log.debug("done initializing partition");
288    }
289    
290    protected void startService() throws Exception JavaDoc
291    {
292       logHistory ("Starting partition");
293       
294       // Store our uniqueId in the channel
295
configureUniqueId();
296       
297       channel.connect(getPartitionName());
298       
299       try
300       {
301          // get current JG group properties
302

303          log.debug("get nodeName");
304          this.localJGAddress = (IpAddress)channel.getLocalAddress();
305          this.me = new ClusterNode(this.localJGAddress);
306          this.nodeName = this.me.getName();
307
308          // FIXME -- just block waiting for viewAccepted!
309
log.debug("Get current members");
310          View view = channel.getView();
311          this.jgmembers = (Vector JavaDoc)view.getMembers().clone();
312          this.members = translateAddresses(this.jgmembers); // TRANSLATE
313
log.info("Number of cluster members: " + members.size());
314          for(int m = 0; m > members.size(); m ++)
315          {
316             Object JavaDoc node = members.get(m);
317             log.debug(node);
318          }
319          // Keep a list of other members only for "exclude-self" RPC calls
320

321          this.jgotherMembers = (Vector JavaDoc)view.getMembers().clone();
322          this.jgotherMembers.remove (channel.getLocalAddress());
323          this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
324
log.info ("Other members: " + this.otherMembers.size ());
325
326          verifyNodeIsUnique(view.getMembers());
327
328          // Update the initial view id
329
this.currentViewId = view.getVid().getId();
330
331          // We must now synchronize new serviceState transfer subscriber
332
//
333
fetchState();
334          
335          if (selfCreatedDRM)
336          {
337             // We are now able to start our DRM and DS
338
((DistributedReplicantManagerImpl) this.replicantManager).start();
339          }
340          
341          // Start the asynch listener handler thread
342
asynchHandler.start();
343          
344          // Bind ourself in the public JNDI space
345
Context JavaDoc ctx = new InitialContext JavaDoc();
346          this.bind("/HAPartition/" + getPartitionName(), this, ClusterPartition.class, ctx);
347          log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
348       }
349       catch (Throwable JavaDoc t)
350       {
351          log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
352          channel.disconnect();
353          throw (t instanceof Exception JavaDoc) ? (Exception JavaDoc) t : new RuntimeException JavaDoc(t);
354       }
355       
356    }
357
358    protected void stopService() throws Exception JavaDoc
359    {
360       logHistory ("Stopping partition");
361       log.info("Stopping partition " + getPartitionName());
362
363       stopChannelDebugger();
364       
365       try
366       {
367          asynchHandler.stop();
368       }
369       catch( Exception JavaDoc e)
370       {
371          log.warn("Failed to stop asynchHandler", e);
372       }
373
374       
375       if (selfCreatedDRM)
376       {
377          // Stop the DRM service
378
// TODO remove when DRM is independent
379
try
380          {
381             ((DistributedReplicantManagerImpl) this.replicantManager).stop();
382          }
383          catch (Exception JavaDoc e)
384          {
385             log.error("operation failed", e);
386          }
387       }
388
389 // NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
390
// add the destroyPartition() step
391
try
392       {
393          channel.disconnect();
394       }
395       catch (Exception JavaDoc e)
396       {
397          log.error("operation failed", e);
398       }
399
400      String JavaDoc boundName = "/HAPartition/" + getPartitionName();
401
402      InitialContext JavaDoc ctx = new InitialContext JavaDoc();
403      try
404      {
405         ctx.unbind(boundName);
406      }
407      finally
408      {
409         ctx.close();
410      }
411
412      NonSerializableFactory.unbind (boundName);
413
414      log.info("Partition " + getPartitionName() + " stopped.");
415    }
416    
417    protected void destroyService() throws Exception JavaDoc
418    {
419       log.debug("Destroying HAPartition: " + getPartitionName());
420       
421       if (selfCreatedDRM)
422       {
423          try
424          {
425             if (server != null)
426                ((DistributedReplicantManagerImpl) replicantManager).unregisterWithJmx(server);
427             ((DistributedReplicantManagerImpl) replicantManager).destroy();
428          }
429          catch (Exception JavaDoc e)
430          {
431             log.error("Destroying DRM failed", e);
432          }
433       }
434       try
435       {
436          channel.close();
437       }
438       catch (Exception JavaDoc e)
439       {
440          log.error("Closing channel failed", e);
441       }
442
443       log.info("Partition " + getPartitionName() + " destroyed.");
444    }
445    
446    // ---------------------------------------------------------- State Transfer
447

448
449    protected void fetchState() throws Exception JavaDoc
450    {
451       log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() +
452             " milliseconds):");
453       long start, stop;
454       isStateSet = false;
455       start = System.currentTimeMillis();
456       boolean rc = channel.getState(null, getStateTransferTimeout());
457       if (rc)
458       {
459          synchronized (stateLock)
460          {
461             while (!isStateSet)
462             {
463                if (setStateException != null)
464                   throw setStateException;
465
466                try
467                {
468                   stateLock.wait();
469                }
470                catch (InterruptedException JavaDoc iex)
471                {
472                }
473             }
474          }
475          stop = System.currentTimeMillis();
476          log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
477       }
478       else
479       {
480          // No one provided us with serviceState.
481
// We need to find out if we are the coordinator, so we must
482
// block until viewAccepted() is called at least once
483

484          synchronized (members)
485          {
486             while (members.size() == 0)
487             {
488                log.debug("waiting on viewAccepted()");
489                try
490                {
491                   members.wait();
492                }
493                catch (InterruptedException JavaDoc iex)
494                {
495                }
496             }
497          }
498
499          if (isCurrentNodeCoordinator())
500          {
501             log.info("State could not be retrieved (we are the first member in group)");
502          }
503          else
504          {
505             throw new IllegalStateException JavaDoc("Initial serviceState transfer failed: " +
506                "Channel.getState() returned false");
507          }
508       }
509    }
510
511    private void getStateInternal(OutputStream JavaDoc stream) throws IOException JavaDoc
512    {
513       MarshalledValueOutputStream mvos = null; // don't create until we know we need it
514

515       for (Iterator JavaDoc keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
516       {
517          Map.Entry JavaDoc entry = (Map.Entry JavaDoc)keys.next();
518          HAPartition.HAPartitionStateTransfer subscriber =
519             (HAPartition.HAPartitionStateTransfer) entry.getValue();
520          log.debug("getState for " + entry.getKey());
521          Object JavaDoc state = subscriber.getCurrentState();
522          if (state != null)
523          {
524             if (mvos == null)
525             {
526                // This is our first write, so need to write the header first
527
stream.write(SERIALIZABLE_VALUE);
528                
529                mvos = new MarshalledValueOutputStream(stream);
530             }
531             
532             mvos.writeObject(entry.getKey());
533             mvos.writeObject(state);
534          }
535       }
536       
537       if (mvos == null)
538       {
539          // We never wrote any serviceState, so write the NULL header
540
stream.write(NULL_VALUE);
541       }
542       else
543       {
544          mvos.writeObject(new StateStreamEnd());
545       }
546       
547    }
548    
549    private void setStateInternal(InputStream JavaDoc stream) throws IOException JavaDoc, ClassNotFoundException JavaDoc
550    {
551       byte type = (byte) stream.read();
552          
553       if (type == NULL_VALUE)
554       {
555          log.debug("serviceState is null");
556          return;
557       }
558       
559       long used_mem_before, used_mem_after;
560       Runtime JavaDoc rt=Runtime.getRuntime();
561       used_mem_before=rt.totalMemory() - rt.freeMemory();
562       
563       MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
564       
565       while (true)
566       {
567          Object JavaDoc obj = mvis.readObject();
568          if (obj instanceof StateStreamEnd)
569             break;
570          
571          String JavaDoc key = (String JavaDoc) obj;
572          log.debug("setState for " + key);
573          Object JavaDoc someState = mvis.readObject();
574          HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
575          if (subscriber != null)
576          {
577             try
578             {
579                subscriber.setCurrentState((Serializable JavaDoc)someState);
580             }
581             catch (Exception JavaDoc e)
582             {
583                // Don't let issues with one subscriber affect others
584
// unless it is DRM, which is really an internal function
585
// of the HAPartition
586
// FIXME remove this once DRM is JBC-based
587
if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
588                {
589                   if (e instanceof RuntimeException JavaDoc)
590                      throw (RuntimeException JavaDoc) e;
591                   else
592                      throw new RuntimeException JavaDoc(e);
593                }
594                else
595                {
596                   log.error("Caught exception setting serviceState to " + subscriber, e);
597                }
598             }
599          }
600          else
601          {
602             log.debug("There is no stateHandler for: " + key);
603          }
604       }
605
606       used_mem_after=rt.totalMemory() - rt.freeMemory();
607       log.debug("received serviceState; expanded memory by " +
608             (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
609             ", used memory after: " + used_mem_after + ")");
610    }
611
612    private void recordSetStateFailure(Throwable JavaDoc t)
613    {
614       log.error("failed setting serviceState", t);
615       if (t instanceof Exception JavaDoc)
616          setStateException = (Exception JavaDoc) t;
617       else
618          setStateException = new Exception JavaDoc(t);
619    }
620
621    private void notifyStateTransferCompleted()
622    {
623       synchronized (stateLock)
624       {
625          // Notify wait that serviceState has been set.
626
stateLock.notifyAll();
627       }
628    }
629    
630    // org.jgroups.MembershipListener implementation ----------------------------------------------
631

632    public void suspect(org.jgroups.Address suspected_mbr)
633    {
634       logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
635       if (isCurrentNodeCoordinator ())
636          clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
637       else
638          log.info("Suspected member: " + suspected_mbr);
639    }
640
641    public void block() {}
642    
643    /** Notification of a cluster view change. This is done from the JG protocol
644     * handlder thread and we must be careful to not unduly block this thread.
645     * Because of this there are two types of listeners, synchronous and
646     * asynchronous. The synchronous listeners are messaged with the view change
647     * event using the calling thread while the asynchronous listeners are
648     * messaged using a seperate thread.
649     *
650     * @param newView
651     */

652    public void viewAccepted(View newView)
653    {
654       try
655       {
656          // we update the view id
657
//
658
this.currentViewId = newView.getVid().getId();
659
660          // Keep a list of other members only for "exclude-self" RPC calls
661
//
662
this.jgotherMembers = (Vector JavaDoc)newView.getMembers().clone();
663          this.jgotherMembers.remove (channel.getLocalAddress());
664          this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
665
Vector JavaDoc translatedNewView = translateAddresses ((Vector JavaDoc)newView.getMembers().clone());
666          logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
667                      " (old view: " + this.members + " )");
668
669
670          // Save the previous view and make a copy of the new view
671
Vector JavaDoc oldMembers = this.members;
672
673          Vector JavaDoc newjgMembers = (Vector JavaDoc)newView.getMembers().clone();
674          Vector JavaDoc newMembers = translateAddresses(newjgMembers); // TRANSLATE
675
if (this.members == null)
676          {
677             // Initial viewAccepted
678
this.members = newMembers;
679             this.jgmembers = newjgMembers;
680             log.debug("ViewAccepted: initial members set");
681             return;
682          }
683          this.members = newMembers;
684          this.jgmembers = newjgMembers;
685
686          int difference = 0;
687          if (oldMembers == null)
688             difference = newMembers.size () - 1;
689          else
690             difference = newMembers.size () - oldMembers.size ();
691          
692          if (isCurrentNodeCoordinator ())
693             clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
694                                       this.currentViewId + ", delta: " + difference + ") : " + this.members);
695          else
696             log.info("New cluster view for partition " + getPartitionName() + ": " +
697                      this.currentViewId + " (" + this.members + " delta: " + difference + ")");
698
699          // Build a ViewChangeEvent for the asynch listeners
700
ViewChangeEvent event = new ViewChangeEvent();
701          event.viewId = currentViewId;
702          event.allMembers = translatedNewView;
703          event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
704          event.newMembers = getNewMembers(oldMembers, event.allMembers);
705          event.originatingGroups = null;
706          // if the new view occurs because of a merge, we first inform listeners of the merge
707
if(newView instanceof MergeView)
708          {
709             MergeView mergeView = (MergeView) newView;
710             event.originatingGroups = mergeView.getSubgroups();
711          }
712
713          log.debug("membership changed from " + this.members.size() + " to "
714             + event.allMembers.size());
715          // Put the view change to the asynch queue
716
this.asynchHandler.queueEvent(event);
717
718          // Broadcast the new view to the synchronous view change listeners
719
if (this.allowSyncListeners)
720          {
721             this.notifyListeners(synchListeners, event.viewId, event.allMembers,
722                   event.deadMembers, event.newMembers, event.originatingGroups);
723          }
724       }
725       catch (Exception JavaDoc ex)
726       {
727          log.error("ViewAccepted failed", ex);
728       }
729    }
730
731    // HAPartition implementation ----------------------------------------------
732

733    public String JavaDoc getNodeName()
734    {
735       return nodeName;
736    }
737    
738    public String JavaDoc getPartitionName()
739    {
740       return (config == null ? null : config.getPartitionName());
741    }
742    
743    public DistributedReplicantManager getDistributedReplicantManager()
744    {
745       return replicantManager;
746    }
747    
748    public DistributedState getDistributedStateService()
749    {
750       return config.getDistributedState();
751    }
752
753    public long getCurrentViewId()
754    {
755       return this.currentViewId;
756    }
757    
758    public Vector JavaDoc getCurrentView()
759    {
760       Vector JavaDoc result = new Vector JavaDoc (this.members.size());
761       for (int i = 0; i < members.size(); i++)
762       {
763          result.add( ((ClusterNode) members.elementAt(i)).getName() );
764       }
765       return result;
766    }
767
768    public ClusterNode[] getClusterNodes ()
769    {
770       ClusterNode[] nodes = new ClusterNode[this.members.size()];
771       this.members.toArray(nodes);
772       return nodes;
773    }
774
775    public ClusterNode getClusterNode ()
776    {
777       return me;
778    }
779
780    public boolean isCurrentNodeCoordinator ()
781    {
782       if(this.members == null || this.members.size() == 0 || this.me == null)
783          return false;
784      return this.members.elementAt (0).equals (this.me);
785    }
786
787    // ***************************
788
// ***************************
789
// RPC multicast communication
790
// ***************************
791
// ***************************
792
//
793
public void registerRPCHandler(String JavaDoc objName, Object JavaDoc subscriber)
794    {
795       rpcHandlers.put(objName, subscriber);
796    }
797    
798    public void unregisterRPCHandler(String JavaDoc objName, Object JavaDoc subscriber)
799    {
800       rpcHandlers.remove(objName);
801    }
802       
803
804    /**
805     *
806     * @param objName
807     * @param methodName
808     * @param args
809     * @param excludeSelf
810     * @return
811     * @throws Exception
812     * @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
813     */

814    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
815       Object JavaDoc[] args, boolean excludeSelf) throws Exception JavaDoc
816    {
817       return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
818    }
819
820    /**
821     * This function is an abstraction of RpcDispatcher.
822     */

823    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
824       Object JavaDoc[] args, Class JavaDoc[] types, boolean excludeSelf) throws Exception JavaDoc
825    {
826       return callMethodOnCluster(objName, methodName, args, types, excludeSelf, getMethodCallTimeout());
827    }
828
829
830    public ArrayList JavaDoc callMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
831        Object JavaDoc[] args, Class JavaDoc[] types, boolean excludeSelf, long methodTimeout) throws Exception JavaDoc
832    {
833       ArrayList JavaDoc rtn = new ArrayList JavaDoc();
834       MethodCall m=null;
835       RspList rsp = null;
836       boolean trace = log.isTraceEnabled();
837
838       if(types != null)
839          m=new MethodCall(objName + "." + methodName, args, types);
840       else
841          m=new MethodCall(objName + "." + methodName, args);
842
843       if (excludeSelf)
844       {
845          if( trace )
846          {
847             log.trace("callMethodOnCluster(true), objName="+objName
848                +", methodName="+methodName+", members="+jgotherMembers);
849          }
850          rsp = dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
851       }
852       else
853       {
854          if( trace )
855          {
856             log.trace("callMethodOnCluster(false), objName="+objName
857                +", methodName="+methodName+", members="+members);
858          }
859          rsp = dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
860       }
861
862       if (rsp != null)
863       {
864          for (int i = 0; i < rsp.size(); i++)
865          {
866             Object JavaDoc item = rsp.elementAt(i);
867             if (item instanceof Rsp)
868             {
869                Rsp response = (Rsp) item;
870                // Only include received responses
871
boolean wasReceived = response.wasReceived();
872                if( wasReceived == true )
873                {
874                   item = response.getValue();
875                   if (!(item instanceof NoHandlerForRPC))
876                      rtn.add(item);
877                }
878                else if( trace )
879                   log.trace("Ignoring non-received response: "+response);
880             }
881             else
882             {
883                if (!(item instanceof NoHandlerForRPC))
884                   rtn.add(item);
885                else if( trace )
886                   log.trace("Ignoring NoHandlerForRPC");
887             }
888          }
889       }
890
891       return rtn;
892     }
893
894    /**
895     * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
896     * cluster.
897     * and is replaced
898     * @param objName
899     * @param methodName
900     * @param args
901     * @param types
902     * @param excludeSelf
903     * @return
904     * @throws Exception
905     */

906    public ArrayList JavaDoc callMethodOnCoordinatorNode(String JavaDoc objName, String JavaDoc methodName,
907           Object JavaDoc[] args, Class JavaDoc[] types,boolean excludeSelf) throws Exception JavaDoc
908    {
909       return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, getMethodCallTimeout());
910    }
911
912    /**
913     * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
914     * cluster.
915     * and is replaced
916     * @param objName
917     * @param methodName
918     * @param args
919     * @param types
920     * @param excludeSelf
921     * @param methodTimeout
922     * @return
923     * @throws Exception
924     */

925    public ArrayList JavaDoc callMethodOnCoordinatorNode(String JavaDoc objName, String JavaDoc methodName,
926           Object JavaDoc[] args, Class JavaDoc[] types,boolean excludeSelf, long methodTimeout) throws Exception JavaDoc
927    {
928       ArrayList JavaDoc rtn = new ArrayList JavaDoc();
929       MethodCall m=null;
930       RspList rsp = null;
931       boolean trace = log.isTraceEnabled();
932
933       if(types != null)
934          m=new MethodCall(objName + "." + methodName, args, types);
935       else
936          m=new MethodCall(objName + "." + methodName, args);
937
938       if( trace )
939       {
940          log.trace("callMethodOnCoordinatorNode(false), objName="+objName
941             +", methodName="+methodName);
942       }
943
944       // the first cluster view member is the coordinator
945
Vector JavaDoc coordinatorOnly = new Vector JavaDoc();
946       // If we are the coordinator, only call ourself if 'excludeSelf' is false
947
if (false == isCurrentNodeCoordinator () ||
948           false == excludeSelf)
949          coordinatorOnly.addElement(this.jgmembers.elementAt (0));
950
951       rsp = dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
952
953       if (rsp != null)
954       {
955          for (int i = 0; i < rsp.size(); i++)
956          {
957             Object JavaDoc item = rsp.elementAt(i);
958             if (item instanceof Rsp)
959             {
960                Rsp response = (Rsp) item;
961                // Only include received responses
962
boolean wasReceived = response.wasReceived();
963                if( wasReceived == true )
964                {
965                   item = response.getValue();
966                   if (!(item instanceof NoHandlerForRPC))
967                      rtn.add(item);
968                }
969                else if( trace )
970                   log.trace("Ignoring non-received response: "+response);
971             }
972             else
973             {
974                if (!(item instanceof NoHandlerForRPC))
975                   rtn.add(item);
976                else if( trace )
977                   log.trace("Ignoring NoHandlerForRPC");
978             }
979          }
980       }
981
982       return rtn;
983    }
984
985
986    /**
987     *
988     * @param objName
989     * @param methodName
990     * @param args
991     * @param excludeSelf
992     * @throws Exception
993     * @deprecated Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead
994     */

995    public void callAsynchMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
996       Object JavaDoc[] args, boolean excludeSelf)
997       throws Exception JavaDoc
998    {
999       callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
1000   }
1001
1002   /**
1003    * This function is an abstraction of RpcDispatcher for asynchronous messages
1004    */

1005   public void callAsynchMethodOnCluster(String JavaDoc objName, String JavaDoc methodName,
1006      Object JavaDoc[] args, Class JavaDoc[] types, boolean excludeSelf) throws Exception JavaDoc
1007   {
1008      MethodCall m = null;
1009      boolean trace = log.isTraceEnabled();
1010
1011      if(types != null)
1012         m=new MethodCall(objName + "." + methodName, args, types);
1013      else
1014         m=new MethodCall(objName + "." + methodName, args);
1015
1016      if (excludeSelf)
1017      {
1018         if( trace )
1019         {
1020            log.trace("callAsynchMethodOnCluster(true), objName="+objName
1021               +", methodName="+methodName+", members="+jgotherMembers);
1022         }
1023         dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
1024      }
1025      else
1026      {
1027         if( trace )
1028         {
1029            log.trace("callAsynchMethodOnCluster(false), objName="+objName
1030               +", methodName="+methodName+", members="+members);
1031         }
1032         dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
1033      }
1034   }
1035   
1036   // *************************
1037
// *************************
1038
// State transfer management
1039
// *************************
1040
// *************************
1041
//
1042
public void subscribeToStateTransferEvents(String JavaDoc objectName, HAPartitionStateTransfer subscriber)
1043   {
1044      stateHandlers.put(objectName, subscriber);
1045   }
1046   
1047   public void unsubscribeFromStateTransferEvents(String JavaDoc objectName, HAPartitionStateTransfer subscriber)
1048   {
1049      stateHandlers.remove(objectName);
1050   }
1051   
1052   // *************************
1053
// *************************
1054
// Group Membership listeners
1055
// *************************
1056
// *************************
1057
//
1058
public void registerMembershipListener(HAMembershipListener listener)
1059   {
1060      boolean isAsynch = (this.allowSyncListeners == false)
1061            || (listener instanceof AsynchHAMembershipListener)
1062            || (listener instanceof AsynchHAMembershipExtendedListener);
1063      if( isAsynch ) {
1064         synchronized(this.asynchListeners) {
1065            this.asynchListeners.add(listener);
1066         }
1067      }
1068      else {
1069         synchronized(this.synchListeners) {
1070            this.synchListeners.add(listener);
1071         }
1072      }
1073   }
1074   
1075   public void unregisterMembershipListener(HAMembershipListener listener)
1076   {
1077      boolean isAsynch = (this.allowSyncListeners == false)
1078            || (listener instanceof AsynchHAMembershipListener)
1079            || (listener instanceof AsynchHAMembershipExtendedListener);
1080      if( isAsynch ) {
1081         synchronized(this.asynchListeners) {
1082            this.asynchListeners.remove(listener);
1083         }
1084      }
1085      else {
1086         synchronized(this.synchListeners) {
1087            this.synchListeners.remove(listener);
1088         }
1089      }
1090   }
1091   
1092   public boolean getAllowSynchronousMembershipNotifications()
1093   {
1094      return allowSyncListeners;
1095   }
1096
1097   public void setAllowSynchronousMembershipNotifications(boolean allowSync)
1098   {
1099      this.allowSyncListeners = allowSync;
1100   }
1101   
1102   // AsynchEventHandler.AsynchEventProcessor -----------------------
1103

1104   public void processEvent(Object JavaDoc event)
1105   {
1106      ViewChangeEvent vce = (ViewChangeEvent) event;
1107      notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
1108            vce.deadMembers, vce.newMembers, vce.originatingGroups);
1109      
1110   }
1111   
1112   
1113   // Public ------------------------------------------------------------------
1114

1115   public void setDistributedReplicantManager(DistributedReplicantManager drm)
1116   {
1117      if (this.replicantManager != null && !(replicantManager == drm))
1118         throw new IllegalStateException JavaDoc("DistributedReplicantManager already set");
1119      
1120      this.replicantManager = drm;
1121   }
1122   
1123   // Protected -----------------------------------------------------
1124

1125   protected void verifyNodeIsUnique (Vector JavaDoc javaGroupIpAddresses) throws Exception JavaDoc
1126   {
1127      byte[] localUniqueName = this.localJGAddress.getAdditionalData();
1128      if (localUniqueName == null)
1129      {
1130         log.error("No additional information has been found in the JGroups address; " +
1131                  "make sure you are running with a correct version of JGroups and that the protocols " +
1132                  "you are using support 'additionalData' behaviour.");
1133         throw new Exception JavaDoc ("Local node (" + this.localJGAddress + ") removed from cluster; local node name is missing.");
1134      }
1135
1136      for (int i = 0; i < javaGroupIpAddresses.size(); i++)
1137      {
1138         IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
1139         if (!address.equals(this.localJGAddress))
1140         {
1141            if (localUniqueName.equals(address.getAdditionalData()))
1142               throw new Exception JavaDoc ("Local node (" + this.localJGAddress + ") removed from cluster; another node (" + address + ") publicizing the same name was already there.");
1143         }
1144      }
1145   }
1146
1147   /**
1148    * Helper method that binds the partition in the JNDI tree.
1149    * @param jndiName Name under which the object must be bound
1150    * @param who Object to bind in JNDI
1151    * @param classType Class type under which should appear the bound object
1152    * @param ctx Naming context under which we bind the object
1153    * @throws Exception Thrown if a naming exception occurs during binding
1154    */

1155   protected void bind(String JavaDoc jndiName, Object JavaDoc who, Class JavaDoc classType, Context JavaDoc ctx) throws Exception JavaDoc
1156   {
1157      // Ah ! This service isn't serializable, so we use a helper class
1158
//
1159
NonSerializableFactory.bind(jndiName, who);
1160      Name JavaDoc n = ctx.getNameParser("").parse(jndiName);
1161      while (n.size () > 1)
1162      {
1163         String JavaDoc ctxName = n.get (0);
1164         try
1165         {
1166            ctx = (Context JavaDoc)ctx.lookup (ctxName);
1167         }
1168         catch (NameNotFoundException JavaDoc e)
1169         {
1170            log.debug ("creating Subcontext " + ctxName);
1171            ctx = ctx.createSubcontext (ctxName);
1172         }
1173         n = n.getSuffix (1);
1174      }
1175
1176      // The helper class NonSerializableFactory uses address type nns, we go on to
1177
// use the helper class to bind the service object in JNDI
1178
//
1179
StringRefAddr JavaDoc addr = new StringRefAddr JavaDoc("nns", jndiName);
1180      Reference JavaDoc ref = new Reference JavaDoc(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
1181      ctx.rebind (n.get (0), ref);
1182   }
1183   
1184   /**
1185    * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
1186    * Dead members are old - new members.
1187    * @param oldMembers Vector of old members
1188    * @param newMembers Vector of new members
1189    * @return Vector of members that have died between the two views, can be empty.
1190    */

1191   protected Vector JavaDoc getDeadMembers(Vector JavaDoc oldMembers, Vector JavaDoc newMembers)
1192   {
1193      if(oldMembers == null) oldMembers=new Vector JavaDoc();
1194      if(newMembers == null) newMembers=new Vector JavaDoc();
1195      Vector JavaDoc dead=(Vector JavaDoc)oldMembers.clone();
1196      dead.removeAll(newMembers);
1197      log.debug("dead members: " + dead);
1198      return dead;
1199   }
1200   
1201   /**
1202    * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
1203    * @param oldMembers Vector of old members
1204    * @param allMembers Vector of new members
1205    * @return Vector of members that have joined the partition between the two views
1206    */

1207   protected Vector JavaDoc getNewMembers(Vector JavaDoc oldMembers, Vector JavaDoc allMembers)
1208   {
1209      if(oldMembers == null) oldMembers=new Vector JavaDoc();
1210      if(allMembers == null) allMembers=new Vector JavaDoc();
1211      Vector JavaDoc newMembers=(Vector JavaDoc)allMembers.clone();
1212      newMembers.removeAll(oldMembers);
1213      return newMembers;
1214   }
1215
1216   protected void notifyListeners(ArrayList JavaDoc theListeners, long viewID,
1217      Vector JavaDoc allMembers, Vector JavaDoc deadMembers, Vector JavaDoc newMembers,
1218      Vector JavaDoc originatingGroups)
1219   {
1220      log.debug("Begin notifyListeners, viewID: "+viewID);
1221      synchronized(theListeners)
1222      {
1223         // JBAS-3619 -- don't hold synch lock while notifying
1224
theListeners = (ArrayList JavaDoc) theListeners.clone();
1225      }
1226      
1227      for (int i = 0; i < theListeners.size(); i++)
1228      {
1229         HAMembershipListener aListener = null;
1230         try
1231         {
1232            aListener = (HAMembershipListener) theListeners.get(i);
1233            if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
1234            {
1235               HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
1236               exListener.membershipChangedDuringMerge (deadMembers, newMembers,
1237                  allMembers, originatingGroups);
1238            }
1239            else
1240            {
1241               aListener.membershipChanged(deadMembers, newMembers, allMembers);
1242            }
1243         }
1244         catch (Throwable JavaDoc e)
1245         {
1246            // a problem in a listener should not prevent other members to receive the new view
1247
log.warn("HAMembershipListener callback failure: "+aListener, e);
1248         }
1249      }
1250      
1251      log.debug("End notifyListeners, viewID: "+viewID);
1252   }
1253
1254   protected Vector JavaDoc translateAddresses (Vector JavaDoc jgAddresses)
1255   {
1256      if (jgAddresses == null)
1257         return null;
1258
1259      Vector JavaDoc result = new Vector JavaDoc (jgAddresses.size());
1260      for (int i = 0; i < jgAddresses.size(); i++)
1261      {
1262         IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
1263         result.add(new ClusterNode (addr));
1264      }
1265
1266      return result;
1267   }
1268
1269   public void logHistory (String JavaDoc message)
1270   {
1271      try
1272      {
1273         history.add(new SimpleDateFormat JavaDoc().format (new Date JavaDoc()) + " : " + message);
1274      }
1275      catch (Exception JavaDoc ignored){}
1276   }
1277
1278   // --------------------------------------------------- ClusterPartitionMBean
1279

1280   public String JavaDoc showHistory ()
1281   {
1282      StringBuffer JavaDoc buff = new StringBuffer JavaDoc();
1283      Vector JavaDoc data = new Vector JavaDoc (this.history);
1284      for (java.util.Iterator JavaDoc row = data.iterator(); row.hasNext();)
1285      {
1286         String JavaDoc info = (String JavaDoc) row.next();
1287         buff.append(info).append("\n");
1288      }
1289      return buff.toString();
1290   }
1291
1292   public String JavaDoc showHistoryAsXML ()
1293   {
1294      StringBuffer JavaDoc buff = new StringBuffer JavaDoc();
1295      buff.append("<events>\n");
1296      Vector JavaDoc data = new Vector JavaDoc (this.history);
1297      for (java.util.Iterator JavaDoc row = data.iterator(); row.hasNext();)
1298      {
1299         buff.append(" <event>\n ");
1300         String JavaDoc info = (String JavaDoc) row.next();
1301         buff.append(info);
1302         buff.append("\n </event>\n");
1303      }
1304      buff.append("</events>\n");
1305      return buff.toString();
1306   }
1307
1308   public void startChannelDebugger()
1309   {
1310      startChannelDebugger(false);
1311   }
1312
1313   public void startChannelDebugger(boolean accumulative)
1314   {
1315      if(debugger == null)
1316      {
1317         debugger=new Debugger(this.channel, accumulative);
1318         debugger.start();
1319      }
1320   }
1321
1322   public void stopChannelDebugger()
1323   {
1324      if(debugger != null)
1325      {
1326         // debugger.stop(); // uncomment when new JGroups version is available
1327
debugger=null;
1328      }
1329   }
1330   
1331   public Cache getClusteredCache()
1332   {
1333      return config.getClusteredCache();
1334   }
1335
1336   public boolean getDeadlockDetection()
1337   {
1338      return config.getDeadlockDetection();
1339   }
1340
1341   public HAPartition getHAPartition()
1342   {
1343      return this;
1344   }
1345
1346   public String JavaDoc getJGroupsVersion()
1347   {
1348      return Version.description + "( " + Version.cvs + ")";
1349   }
1350
1351   public JChannelFactoryMBean getMultiplexer()
1352   {
1353      return config.getMultiplexer();
1354   }
1355
1356   public String JavaDoc getMultiplexerStack()
1357   {
1358      return config.getMultiplexerStack();
1359   }
1360
1361   public InetAddress JavaDoc getNodeAddress()
1362   {
1363      return config.getNodeAddress();
1364   }
1365
1366   public long getStateTransferTimeout() {
1367      return config.getStateTransferTimeout();
1368   }
1369
1370   public long getMethodCallTimeout() {
1371      return config.getMethodCallTimeout();
1372   }
1373
1374   public void setMethodCallTimeout(long timeout)
1375   {
1376      config.setMethodCallTimeout(timeout);
1377   }
1378
1379   public void setStateTransferTimeout(long timeout)
1380   {
1381      config.setStateTransferTimeout(timeout);
1382   }
1383
1384   public String JavaDoc getNodeUniqueId()
1385   {
1386      return config.getNodeUniqueId();
1387   }
1388
1389   // Protected --------------------------------------------------------------
1390

1391   protected void configureUniqueId() throws Exception JavaDoc
1392   {
1393      // We push the independent name in the protocol stack
1394
// before connecting to the cluster
1395
boolean pushNodeName = true;
1396      String JavaDoc uniqueId = config.getNodeUniqueId();
1397      if (uniqueId == null || "".equals(uniqueId)) {
1398         IpAddress ourAddr = (IpAddress) channel.getLocalAddress();
1399         if (ourAddr != null)
1400         {
1401            byte[] additional_data = ourAddr.getAdditionalData();
1402            if (additional_data != null)
1403            {
1404               uniqueId = new String JavaDoc(additional_data);
1405               config.setNodeUniqueId(uniqueId);
1406               pushNodeName = false;
1407            }
1408         }
1409      }
1410      if (uniqueId == null || "".equals(uniqueId)) {
1411         uniqueId = generateUniqueId();
1412         config.setNodeUniqueId(uniqueId);
1413      }
1414      
1415      if (pushNodeName)
1416      {
1417         java.util.HashMap JavaDoc staticNodeName = new java.util.HashMap JavaDoc();
1418         staticNodeName.put("additional_data", uniqueId.getBytes());
1419         this.channel.down(new Event(Event.CONFIG, staticNodeName));
1420      }
1421      
1422      config.setNodeUniqueId(uniqueId);
1423   }
1424   
1425   protected String JavaDoc generateUniqueId() throws Exception JavaDoc
1426   {
1427      // we first try to find a simple meaningful name:
1428
// 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
1429
// 2nd) "local-IP:JMV_GUID" otherwise
1430
// 3rd) return a fully GUID-based representation
1431
//
1432

1433      // Before anything we determine the local host IP (and NOT name as this could be
1434
// resolved differently by other nodes...)
1435

1436      // But use the specified node address for multi-homing
1437

1438      String JavaDoc hostIP = null;
1439      InetAddress JavaDoc address = ServerConfigUtil.fixRemoteAddress(config.getNodeAddress());
1440      if (address == null)
1441      {
1442         log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)");
1443         log.debug ("using a full GUID strategy");
1444         return new VMID JavaDoc().toString();
1445      }
1446      else
1447      {
1448         hostIP = address.getHostAddress();
1449      }
1450
1451      // 1st: is JNDI up and running?
1452
int namingPort = config.getNamingServicePort();
1453      if (namingPort > 0)
1454      {
1455         return hostIP + ":" + namingPort;
1456      }
1457
1458      // 2nd: host-GUID strategy
1459
//
1460
String JavaDoc uid = new UID JavaDoc().toString();
1461      return hostIP + ":" + uid;
1462   }
1463   
1464   // Private -------------------------------------------------------
1465

1466   // Inner classes -------------------------------------------------
1467

1468   private class MessageListenerAdapter
1469         implements ExtendedMessageListener
1470   {
1471      
1472      public void getState(OutputStream JavaDoc stream)
1473      {
1474         logHistory ("getState called on partition");
1475         
1476         log.debug("getState called.");
1477         try
1478         {
1479            getStateInternal(stream);
1480         }
1481         catch (Exception JavaDoc ex)
1482         {
1483            log.error("getState failed", ex);
1484         }
1485         
1486      }
1487      
1488      public void getState(String JavaDoc state_id, OutputStream JavaDoc ostream)
1489      {
1490         throw new UnsupportedOperationException JavaDoc("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
1491      }
1492
1493      public byte[] getState(String JavaDoc state_id)
1494      {
1495         throw new UnsupportedOperationException JavaDoc("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
1496      }
1497      
1498      public void setState(InputStream JavaDoc stream)
1499      {
1500         logHistory ("setState called on partition");
1501         try
1502         {
1503            if (stream == null)
1504            {
1505               log.debug("transferred serviceState is null (may be first member in cluster)");
1506            }
1507            else
1508            {
1509               setStateInternal(stream);
1510            }
1511            
1512            isStateSet = true;
1513         }
1514         catch (Throwable JavaDoc t)
1515         {
1516            recordSetStateFailure(t);
1517         }
1518         finally
1519         {
1520            notifyStateTransferCompleted();
1521         }
1522      }
1523
1524      public byte[] getState()
1525      {
1526         logHistory ("getState called on partition");
1527         
1528         log.debug("getState called.");
1529         try
1530         {
1531            ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc(1024);
1532            getStateInternal(baos);
1533            return baos.toByteArray();
1534         }
1535         catch (Exception JavaDoc ex)
1536         {
1537            log.error("getState failed", ex);
1538         }
1539         return null; // This will cause the receiver to get a "false" on the channel.getState() call
1540
}
1541
1542      public void setState(String JavaDoc state_id, byte[] state)
1543      {
1544         throw new UnsupportedOperationException JavaDoc("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
1545      }
1546
1547      public void setState(String JavaDoc state_id, InputStream JavaDoc istream)
1548      {
1549         throw new UnsupportedOperationException JavaDoc("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
1550      }
1551
1552      public void receive(org.jgroups.Message msg)
1553      { /* complete */}
1554      
1555      public void setState(byte[] obj)
1556      {
1557         logHistory ("setState called on partition");
1558         try
1559         {
1560            if (obj == null)
1561            {
1562               log.debug("transferred serviceState is null (may be first member in cluster)");
1563            }
1564            else
1565            {
1566               ByteArrayInputStream JavaDoc bais = new ByteArrayInputStream JavaDoc(obj);
1567               setStateInternal(bais);
1568               bais.close();
1569            }
1570            
1571            isStateSet = true;
1572         }
1573         catch (Throwable JavaDoc t)
1574         {
1575            recordSetStateFailure(t);
1576         }
1577         finally
1578         {
1579            notifyStateTransferCompleted();
1580         }
1581      }
1582      
1583   }
1584
1585   /**
1586    * A simple data class containing the view change event needed to
1587    * notify the HAMembershipListeners
1588    */

1589   private static class ViewChangeEvent
1590   {
1591      long viewId;
1592      Vector JavaDoc deadMembers;
1593      Vector JavaDoc newMembers;
1594      Vector JavaDoc allMembers;
1595      Vector JavaDoc originatingGroups;
1596   }
1597   
1598   private static class MarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
1599   {
1600
1601      public Object JavaDoc objectFromByteBuffer(byte[] buf) throws Exception JavaDoc
1602      {
1603         return ClusterPartition.objectFromByteBuffer(buf);
1604      }
1605
1606      public byte[] objectToByteBuffer(Object JavaDoc obj) throws Exception JavaDoc
1607      {
1608         return ClusterPartition.objectToByteBuffer(obj);
1609      }
1610   }
1611   
1612   /**
1613    * Overrides RpcDispatcher.Handle so that we can dispatch to many
1614    * different objects.
1615    */

1616   private class RpcHandler extends RpcDispatcher
1617   {
1618      private RpcHandler(Channel channel, MessageListener l, MembershipListener l2, Object JavaDoc server_obj,
1619            boolean deadlock_detection)
1620      {
1621         super(channel, l, l2, server_obj, deadlock_detection);
1622      }
1623      
1624      /**
1625       * Analyze the MethodCall contained in <code>req</code> to find the
1626       * registered service object to invoke against, and then execute it
1627       * against *that* object and return result.
1628       *
1629       * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
1630       * @param req The org.jgroups. representation of the method invocation
1631       * @return The serializable return value from the invocation
1632       */

1633      public Object JavaDoc handle(Message req)
1634      {
1635         Object JavaDoc body = null;
1636         Object JavaDoc retval = null;
1637         MethodCall method_call = null;
1638         boolean trace = log.isTraceEnabled();
1639         
1640         if( trace )
1641            log.trace("Partition " + getPartitionName() + " received msg");
1642         if(req == null || req.getBuffer() == null)
1643         {
1644            log.warn("message or message buffer is null !");
1645            return null;
1646         }
1647         
1648         try
1649         {
1650            body = objectFromByteBuffer(req.getBuffer());
1651         }
1652         catch(Exception JavaDoc e)
1653         {
1654            log.warn("failed unserializing message buffer (msg=" + req + ")", e);
1655            return null;
1656         }
1657         
1658         if(body == null || !(body instanceof MethodCall))
1659         {
1660            log.warn("message does not contain a MethodCall object !");
1661            return null;
1662         }
1663         
1664         // get method call informations
1665
//
1666
method_call = (MethodCall)body;
1667         String JavaDoc methodName = method_call.getName();
1668         
1669         if( trace )
1670            log.trace("pre methodName: " + methodName);
1671         
1672         int idx = methodName.lastIndexOf('.');
1673         String JavaDoc handlerName = methodName.substring(0, idx);
1674         String JavaDoc newMethodName = methodName.substring(idx + 1);
1675         
1676         if( trace )
1677         {
1678            log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
1679            log.trace("Handle: " + methodName);
1680         }
1681         
1682         // prepare method call
1683
method_call.setName(newMethodName);
1684         Object JavaDoc handler = rpcHandlers.get(handlerName);
1685         if (handler == null)
1686         {
1687            if( trace )
1688               log.debug("No rpc handler registered under: "+handlerName);
1689            return new NoHandlerForRPC();
1690         }
1691
1692         /* Invoke it and just return any exception with trace level logging of
1693         the exception. The exception semantics of a group rpc call are weak as
1694         the return value may be a normal return value or the exception thrown.
1695         */

1696         try
1697         {
1698            retval = method_call.invoke(handler);
1699            if( trace )
1700               log.trace("rpc call return value: "+retval);
1701         }
1702         catch (Throwable JavaDoc t)
1703         {
1704            if( trace )
1705               log.trace("rpc call threw exception", t);
1706            retval = t;
1707         }
1708         
1709         return retval;
1710      }
1711      
1712   }
1713   
1714   private void setupLoggers(String JavaDoc partitionName)
1715   {
1716      if (partitionName == null)
1717      {
1718         this.log = Logger.getLogger(HAPartition.class.getName());
1719         this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle");
1720      }
1721      else
1722      {
1723         this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
1724         this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
1725      }
1726   }
1727   
1728}
1729
Popular Tags