KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > go > teaservlet > util > cluster > ClusterManager


1 /* ====================================================================
2  * TeaServlet - Copyright (c) 1999-2000 Walt Disney Internet Group
3  * ====================================================================
4  * The Tea Software License, Version 1.1
5  *
6  * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Walt Disney Internet Group (http://opensource.go.com/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
28  * not be used to endorse or promote products derived from this
29  * software without prior written permission. For written
30  * permission, please contact opensource@dig.com.
31  *
32  * 5. Products derived from this software may not be called "Tea",
33  * "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
34  * "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
35  * written permission of the Walt Disney Internet Group.
36  *
37  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40  * DISCLAIMED. IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
41  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
44  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
45  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
46  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
47  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48  * ====================================================================
49  *
50  * For more information about Tea, please see http://opensource.go.com/.
51  */

52  
53 package com.go.teaservlet.util.cluster;
54
55 import java.io.*;
56 import java.net.*;
57 import java.rmi.Naming JavaDoc;
58 import java.rmi.NoSuchObjectException JavaDoc;
59 import java.rmi.RemoteException JavaDoc;
60 import java.rmi.server.RemoteServer JavaDoc;
61 import java.rmi.registry.LocateRegistry JavaDoc;
62 import java.rmi.registry.Registry JavaDoc;
63 import java.rmi.AlreadyBoundException JavaDoc;
64 import java.util.StringTokenizer JavaDoc;
65 import java.util.Iterator JavaDoc;
66 import java.util.Collection JavaDoc;
67 import java.util.Collections JavaDoc;
68 import java.util.ArrayList JavaDoc;
69 import java.util.Vector JavaDoc;
70 import com.go.trove.util.PropertyMap;
71 import com.go.trove.log.Syslog;
72
73
74 /******************************************************************************
75  *
76  * @author Jonathan Colwell
77  * @version
78  * <!--$$Revision:--> 14 <!-- $-->, <!--$$JustDate:--> 7/27/01 <!-- $-->
79  */

80 public class ClusterManager {
81
82     /**
83      * Provides an easy way to create a cluster manager when provided with a
84      * PropertyMap containing some of the following properties, and an
85      * implementation of the Clustered interface. In order to use the
86      * multicast discovery feature, launchAuto() should be called on the
87      * returned instance. killAuto() should also be called when finished
88      * with the manager.
89      * <pre>
90      * cluster {
91      * servers = cd-test0;cd-test1
92      * name = FooBar
93      * localNet = 10.192.1.0/24
94      * rmi.port = 1099
95      * multicast {
96      * port = 1099
97      * group = 224.0.1.20
98      * }
99      * }</pre>
100      *
101      */

102
103     private static int cActiveRegistryPort = -1;
104      
105     public static ClusterManager createClusterManager(PropertyMap properties,
106                                                       Clustered clusterObj)
107         throws Exception JavaDoc {
108
109         ClusterManager manager = null;
110
111         if (properties != null) {
112             properties = properties.subMap("cluster");
113
114             if (properties.keySet().size() > 0) {
115                 String JavaDoc servers = properties
116                     .getString("servers");
117                 String JavaDoc clusterName = properties
118                     .getString("name");
119                 InetAddress multicastGroup = null;
120                 String JavaDoc netInterface = properties
121                     .getString("localNet");
122
123                 int rmiPort = properties.getInt("rmi.port", 1099);
124                     
125                 int multicastPort = properties.getInt("multicast.port", 1099);
126                 String JavaDoc group = properties.getString("multicast.group");
127                 if (group != null) {
128                     try {
129                         multicastGroup = InetAddress.getByName(group);
130                     }
131                     catch (UnknownHostException uhe) {}
132                 }
133                 
134                 if (multicastGroup != null) {
135                     manager = new ClusterManager(clusterObj,
136                                                  multicastGroup,
137                                                  multicastPort,
138                                                  rmiPort,
139                                                  netInterface,
140                                                  servers);
141                 }
142                 else if (servers != null) {
143                     manager = new ClusterManager(clusterObj,
144                                                  rmiPort,
145                                                  servers);
146                 }
147             }
148         }
149         return manager;
150     }
151
152     private boolean DEBUG = true;
153     private Vector JavaDoc mUnresolvedServerNames;
154     private final Vector JavaDoc mExplicitlySpecifiedServerNames;
155     private MulticastSocket mSock;
156     private InetAddress mMultiGroup;
157     private String JavaDoc mHostName;
158     private Clustered mCluster;
159     private int mMultiPort, mRmiPort;
160     private Registry JavaDoc mLocalRegistry;
161     private AutomaticClusterManagementThread mAuto;
162
163     /**
164      * Convenience method for users of the Restartable interface within the
165      * TeaServletClusterHook.
166      */

167     public ClusterManager(Restartable restartableObj,String JavaDoc clusterName,
168                           String JavaDoc serverName,InetAddress multicastGroup,
169                           int multicastPort,int rmiPort,String JavaDoc netInterface,
170                           String JavaDoc serverNames)
171         throws IOException,RemoteException JavaDoc {
172
173         this(new TeaServletClusterHook(restartableObj,clusterName,serverName),
174              multicastGroup,multicastPort,rmiPort,netInterface,serverNames);
175     }
176
177     /**
178      * Creates a cluster manager that will use multicast discovery to find
179      * peers with similar configurations.
180      */

181     public ClusterManager(Clustered cluster,InetAddress multicastGroup,
182                           int multicastPort,int rmiPort) throws IOException {
183         this(cluster,multicastGroup,
184              multicastPort,rmiPort,null,null);
185     }
186
187
188     /**
189      * Creates a cluster manager that will use multicast discovery as well as
190      * a list of server names to find peers in this cluster.
191      */

192     public ClusterManager(Clustered cluster,InetAddress multicastGroup,
193                           int multicastPort,int rmiPort,
194                           String JavaDoc netInterface,String JavaDoc serverNames)
195         throws IOException {
196
197         this(cluster,rmiPort,serverNames);
198         mMultiGroup = multicastGroup;
199         mMultiPort = multicastPort;
200         setUpMulticast(multicastGroup,multicastPort,
201                        cluster.getClusterName(),
202                        cluster.getServerName(),netInterface);
203     }
204
205     /**
206      * Convenience method for users of the Restartable interface within the
207      * TeaServletClusterHook.
208      */

209     public ClusterManager(Restartable restartableObj,String JavaDoc clusterName,
210                           String JavaDoc serverName,int rmiPort,String JavaDoc serverNames)
211         throws IOException {
212
213         this(new TeaServletClusterHook(restartableObj,clusterName,serverName),
214              rmiPort,serverNames);
215     }
216
217     /**
218      * Allows the cluster to be defined explicitly by providing names rather
219      * than by using multicast discovery.
220      */

221     public ClusterManager(Clustered cluster,int rmiPort,String JavaDoc serverNames)
222         throws IOException {
223
224         if (rmiPort > 0) {
225             mRmiPort = rmiPort;
226         }
227         else {
228             mRmiPort = 1099;
229         }
230
231         mCluster = cluster;
232         mLocalRegistry = prepareRegistry(cluster,rmiPort);
233         mExplicitlySpecifiedServerNames = new Vector JavaDoc();
234         if (serverNames != null) {
235             StringTokenizer JavaDoc st = new StringTokenizer JavaDoc(serverNames,",; ");
236             while (st.hasMoreTokens()) {
237                 mExplicitlySpecifiedServerNames
238                     .add(st.nextToken().toLowerCase());
239             }
240         }
241         mUnresolvedServerNames = (Vector JavaDoc)mExplicitlySpecifiedServerNames.clone();
242     }
243
244     public String JavaDoc[] resolveServerNames() {
245
246         /* check if any unresolved servers have come online */
247         synchronized (mUnresolvedServerNames) {
248             
249             Iterator JavaDoc resolveIt = mUnresolvedServerNames.iterator();
250             while (resolveIt.hasNext()) {
251                 String JavaDoc nextHost = (String JavaDoc)resolveIt.next();
252                 try {
253                     String JavaDoc namingURL =
254                         ("//" + nextHost + ':' + mRmiPort
255                          + '/' + mCluster.getClusterName());
256
257                     if (DEBUG) {
258                         Syslog.debug("Looking Up " + namingURL);
259                     }
260
261                     Clustered bcl =
262                         (Clustered)Naming.lookup(namingURL);
263                                     
264                     if (bcl != null) {
265                         if (!mCluster.containsPeer(bcl)) {
266                             resolveIt.remove();
267                             mCluster.addPeer(bcl);
268                             Syslog.debug("Successfullly resolved "
269                                                + bcl.getServerName() +
270                                                " as a member of the "
271                                                + mCluster.getClusterName()
272                                                + " cluster.");
273                         }
274                     }
275                     continue;
276                 }
277                 catch (Exception JavaDoc e) {
278                     Syslog.debug(e);
279                 }
280                 Syslog.debug("Failed to resolve "
281                                    + nextHost + " as part of this cluster");
282             }
283         }
284                    
285             /* check if any servers disappeared */
286             try {
287                 Clustered[] peers = getCluster().getKnownPeers();
288                 ArrayList JavaDoc peerNames = new ArrayList JavaDoc();
289                 boolean lostOne = false;
290                 for (int j = 0;j<peers.length;j++) {
291                     try {
292                         peerNames.add(peers[j].getServerName());
293                     }
294                     catch (RemoteException JavaDoc re) {
295                         getCluster().removePeer(peers[j]);
296                         lostOne = true;
297                     }
298                 }
299                 if (lostOne) {
300                     mUnresolvedServerNames =
301                         (Vector JavaDoc)mExplicitlySpecifiedServerNames.clone();
302                     mUnresolvedServerNames.removeAll(peerNames);
303                 }
304                 return (String JavaDoc[])peerNames.toArray(new String JavaDoc[peerNames.size()]);
305             }
306             catch (RemoteException JavaDoc re2) {
307                 Syslog.debug(re2);
308             }
309         
310         return null;
311     }
312             
313
314     public Clustered getCluster() {
315         return mCluster;
316     }
317
318     public int getRMIPort() {
319         return mRmiPort;
320     }
321
322     public int getMulticastPort() {
323         return mMultiPort;
324     }
325
326     public InetAddress getMulticastGroup() {
327         return mMultiGroup;
328     }
329
330     public void send(byte[] msg) throws IOException {
331
332         try {
333             mSock.send(new DatagramPacket(msg,
334                                           msg.length,
335                                           mMultiGroup,
336                                           mMultiPort));
337         }
338         catch (SecurityException JavaDoc se) {
339             se.printStackTrace();
340         }
341     }
342
343     public DatagramPacket getNextPacket() throws IOException {
344
345         DatagramPacket pack = new DatagramPacket(new byte[1024],1024);
346         try {
347             mSock.receive(pack);
348         }
349         catch (SecurityException JavaDoc se) {
350             se.printStackTrace();
351         }
352         return pack;
353     }
354
355     /**
356      * Launches an AutomaticClusterManagementThread in active mode.
357      */

358     public void launchAuto() {
359         launchAuto(true);
360     }
361
362     /**
363      * Allows the management thread to passively take part in the cluster
364      * operations.
365      * Other cluster members will not be made aware of this instance.
366      */

367     public void launchAuto(boolean active) {
368         killAuto();
369         if (mSock != null) {
370             try {
371                 mAuto = new AutomaticClusterManagementThread(this, mCluster
372                                                              .getClusterName(),
373                                                              active);
374             }
375             catch (Exception JavaDoc e) {
376                 mAuto = new AutomaticClusterManagementThread(this, active);
377             }
378             if (mAuto != null) {
379                 mAuto.start();
380             }
381         }
382     }
383
384     /**
385      * permits the AutomaticClusterManagementThread to be subclassed
386      * and used into the ClusterManager.
387      */

388     public void launchAuto(AutomaticClusterManagementThread auto) {
389         killAuto();
390         if (mSock != null) {
391             mAuto = auto;
392             if (mAuto != null) {
393                 mAuto.start();
394             }
395         }
396     }
397
398     public void killAuto() {
399         if (mAuto != null) {
400             mAuto.kill();
401         }
402         mAuto = null;
403     }
404
405     public void joinCluster() throws IOException {
406
407         send(("join~" + getCluster().getClusterName()
408               + '~' + getHostName()).getBytes());
409     }
410
411     public void pingCluster() throws IOException {
412
413         send(("ping~" + getCluster().getClusterName()
414               + '~' + getHostName()).getBytes());
415     }
416
417     public String JavaDoc getHostName() throws IOException {
418
419         if (mHostName == null) {
420             mHostName = InetAddress.getLocalHost().getHostName();
421         }
422         return mHostName;
423     }
424
425     public void destroy() throws IOException {
426         killAuto();
427         if (mSock != null) {
428             send(("leave~" + mCluster.getClusterName()
429                   + '~' + getHostName()).getBytes());
430             mSock.leaveGroup(mMultiGroup);
431             mSock.close();
432             mSock = null;
433         }
434     }
435
436     protected Registry JavaDoc prepareRegistry(Clustered cluster,int port)
437         throws IOException {
438
439         Registry JavaDoc reg = null;
440
441         if (cActiveRegistryPort < 0) {
442             try {
443                 reg = LocateRegistry.createRegistry(port);
444                 cActiveRegistryPort = port;
445             }
446             catch (Exception JavaDoc e) {
447                 reg = null;
448                 cActiveRegistryPort = -1;
449                 e.printStackTrace();
450             }
451         }
452         else {
453             if (cActiveRegistryPort != port) {
454                 Syslog.warn("An active RMI registry already exists on "
455                                    + cActiveRegistryPort
456                                    + " this port will be used in lieu of port "
457                                    + port + ".");
458             }
459
460             port = cActiveRegistryPort;
461             try {
462                 reg = LocateRegistry.getRegistry(port);
463                 reg.list();
464             }
465             catch (Exception JavaDoc e) {
466                 reg = null;
467             }
468         }
469
470         if (reg != null) {
471             try {
472                 reg.bind(cluster.getClusterName(),cluster);
473             }
474             catch (AlreadyBoundException JavaDoc abe) {
475                 reg.rebind(cluster.getClusterName(),cluster);
476             }
477             catch (NoSuchObjectException JavaDoc nsoe) {
478                 Syslog.warn(nsoe);
479                 if (nsoe.detail != null) {
480                     Syslog.warn(nsoe.detail);
481                 }
482                 else {
483                     Syslog.warn("No detail available");
484                 }
485             }
486
487             Syslog.info(cluster.getClusterName() + " bound on " +
488                                    cluster.getServerName() + ':' + port);
489         }
490         else {
491             throw new IOException("Failed to connect to a valid RMI registry");
492         }
493         return reg;
494     }
495
496     protected void setUpMulticast(InetAddress group,int port,
497                         String JavaDoc clusterName) throws IOException {
498         setUpMulticast(group,port,clusterName,null,null);
499     }
500
501     protected void setUpMulticast(InetAddress group,int port,
502                         String JavaDoc clusterName,String JavaDoc host)
503         throws IOException {
504         
505         setUpMulticast(group,port,clusterName,host,null);
506     }
507
508
509
510     protected void setUpMulticast(InetAddress group,int port,
511                         String JavaDoc clusterName,String JavaDoc host,String JavaDoc netInterface)
512         throws IOException {
513
514         if (DEBUG) { Syslog.debug("Setting up Multicast");}
515
516         if (mSock == null) {
517             mSock = new MulticastSocket(port);
518
519             if (host == null) {
520                 host = getHostName();
521             }
522             else {
523                 mHostName = host;
524             }
525
526             //see which interface we're using.
527
InetAddress interf = mSock.getInterface();
528
529             InetAddress[] addresses = InetAddress.getAllByName(host);
530     
531             if (DEBUG) {
532                 Syslog.debug("addresses on this host.");
533                 for (int j = 0;j<addresses.length;j++) {
534                     Syslog.debug(addresses[j].getHostAddress());
535                 }
536                 
537                 Syslog.debug("current interface IP: "
538                                    + interf.getHostAddress());
539             }
540
541             if (netInterface != null) {
542                 try {
543                     byte[] mask = {(byte)255,(byte)255,(byte)255,(byte)0};
544                     int slashindex = -1;
545                     if ((slashindex = netInterface.indexOf('/')) >= 0) {
546                         int maskID = Integer
547                             .parseInt(netInterface.substring(slashindex+1));
548                         netInterface = netInterface.substring(0,slashindex);
549                         slashindex = (0x80000000 >> maskID-1);
550                         mask[3] = (byte)(slashindex & 0xFF);
551                         mask[2] = (byte)((slashindex >> 8) & 0xFF);
552                         mask[1] = (byte)((slashindex >> 16) & 0xFF);
553                         mask[0] = (byte)((slashindex >> 24) & 0xFF);
554                     }
555
556                     StringTokenizer JavaDoc st = new StringTokenizer JavaDoc(netInterface," .");
557                     if (st.countTokens() == 4) {
558                         byte[] maskedNet = new byte[4];
559                         for(int k=0; k<4;k++) {
560                             String JavaDoc token = st.nextToken();
561                             maskedNet[k] = (byte)(Integer.parseInt(token)
562                                                   & mask[k]);
563                         }
564                         if (DEBUG) {
565                             Syslog.debug("net: "
566                                            + maskedNet[0]
567                                            + "." + maskedNet[1]
568                                            + "." + maskedNet[2]
569                                            + "." + maskedNet[3]);
570                         }
571
572                         //pick the address that is on the localNet
573
for (int j=0;j<addresses.length;j++) {
574                             byte[] testAddress = addresses[j].getAddress();
575                             Syslog.debug("testing: "
576                                                + addresses[j].getHostAddress());
577                             if (maskedNet[0] == (testAddress[0] & mask[0])
578                                 && maskedNet[1] == (testAddress[1] & mask[1])
579                                 && maskedNet[2] == (testAddress[2] & mask[2])
580                                 && maskedNet[3] == (testAddress[3] & mask[3])) {
581                             
582                                 mSock.setInterface(addresses[j]);
583                                 //change the host to make sure it's on the bc
584
host = addresses[j].getHostAddress();
585                                 mHostName = host;
586
587                                 Syslog.debug("new interface IP: "
588                                                  + addresses[j]
589                                                  .getHostAddress());
590
591                                 break;
592                             }
593                         }
594                     }
595                 }
596                 catch (Exception JavaDoc e) {
597                     e.printStackTrace();
598                 }
599             }
600
601             mSock.setSendBufferSize(1024);
602             mSock.setReceiveBufferSize(1024);
603             mSock.setTimeToLive(1);
604             mSock.joinGroup(group);
605         }
606     }
607     
608     /**
609      * converts an array of four bytes to a long.
610      * @return the converted bytes as a lon or -1 on error.
611      */

612     public long convertIPBytes(byte[] ipBytes) {
613         if (ipBytes.length == 4) {
614             long ipLong = (((long)ipBytes[0]) << 24);
615             ipLong |= (((long)ipBytes[1]) << 16);
616             ipLong |= (((long)ipBytes[2]) << 8);
617             ipLong |= (long)ipBytes[3];
618             return ipLong;
619         }
620         return -1;
621     }
622
623     /**
624      * turns a dot delimited IP address string into a long.
625      */

626     public long convertIPString(String JavaDoc ip) throws NumberFormatException JavaDoc,
627     IllegalArgumentException JavaDoc {
628
629         StringTokenizer JavaDoc st = new StringTokenizer JavaDoc(ip,".");
630         if (st.countTokens() == 4) {
631             long ipLong = (Long.parseLong(st.nextToken()) << 24);
632             ipLong += (Long.parseLong(st.nextToken()) << 16);
633             ipLong += (Long.parseLong(st.nextToken()) << 8);
634             ipLong += Long.parseLong(st.nextToken());
635             return ipLong;
636         }
637         else {
638             throw new IllegalArgumentException JavaDoc("Invalid IP string");
639         }
640     }
641
642     /**
643      * converts a long to a dot delimited IP address string.
644      */

645     public String JavaDoc convertIPBackToString(long ip) {
646         StringBuffer JavaDoc sb = new StringBuffer JavaDoc(16);
647         sb.append(Long.toString((ip >> 24)& 0xFF));
648         sb.append('.');
649         sb.append(Long.toString((ip >> 16)& 0xFF));
650         sb.append('.');
651         sb.append(Long.toString((ip >> 8) & 0xFF));
652         sb.append('.');
653         sb.append(Long.toString(ip & 0xFF));
654         return sb.toString();
655     }
656 }
657
658
Popular Tags