KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > catalina > cluster > mcast > McastServiceImpl


1 /*
2  * Copyright 1999,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.catalina.cluster.mcast;
18
19 /**
20  * A <b>membership</b> implementation using simple multicast.
21  * This is the representation of a multicast membership service.
22  * This class is responsible for maintaining a list of active cluster nodes in the cluster.
23  * If a node fails to send out a heartbeat, the node will be dismissed.
24  * This is the low level implementation that handles the multicasting sockets.
25  * Need to fix this, could use java.nio and only need one thread to send and receive, or
26  * just use a timeout on the receive
27  * @author Filip Hanik
28  * @version $Revision: 1.14 $, $Date: 2005/02/10 20:44:37 $
29  */

30
31 import java.net.MulticastSocket JavaDoc;
32 import java.io.IOException JavaDoc;
33 import java.net.InetAddress JavaDoc ;
34 import java.net.DatagramPacket JavaDoc;
35 import org.apache.catalina.cluster.MembershipListener;
36 public class McastServiceImpl
37 {
38     private static org.apache.commons.logging.Log log =
39         org.apache.commons.logging.LogFactory.getLog( McastService.class );
40     /**
41      * Internal flag used for the listen thread that listens to the multicasting socket.
42      */

43     protected boolean doRun = false;
44     /**
45      * Socket that we intend to listen to
46      */

47     protected MulticastSocket JavaDoc socket;
48     /**
49      * The local member that we intend to broad cast over and over again
50      */

51     protected McastMember member;
52     /**
53      * The multicast address
54      */

55     protected InetAddress JavaDoc address;
56     /**
57      * The multicast port
58      */

59     protected int port;
60     /**
61      * The time it takes for a member to expire.
62      */

63     protected long timeToExpiration;
64     /**
65      * How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration
66      */

67     protected long sendFrequency;
68     /**
69      * Reuse the sendPacket, no need to create a new one everytime
70      */

71     protected DatagramPacket JavaDoc sendPacket;
72     /**
73      * Reuse the receivePacket, no need to create a new one everytime
74      */

75     protected DatagramPacket JavaDoc receivePacket;
76     /**
77      * The membership, used so that we calculate memberships when they arrive or don't arrive
78      */

79     protected McastMembership membership;
80     /**
81      * The actual listener, for callback when shits goes down
82      */

83     protected MembershipListener service;
84     /**
85      * Thread to listen for pings
86      */

87     protected ReceiverThread receiver;
88     /**
89      * Thread to send pings
90      */

91     protected SenderThread sender;
92
93     /**
94      * When was the service started
95      */

96     protected long serviceStartTime = System.currentTimeMillis();
97     
98     protected int mcastTTL = -1;
99     protected int mcastSoTimeout = -1;
100     protected InetAddress JavaDoc mcastBindAddress = null;
101
102     /**
103      * Create a new mcast service impl
104      * @param member - the local member
105      * @param sendFrequency - the time (ms) in between pings sent out
106      * @param expireTime - the time (ms) for a member to expire
107      * @param port - the mcast port
108      * @param bind - the bind address (not sure this is used yet)
109      * @param mcastAddress - the mcast address
110      * @param service - the callback service
111      * @throws IOException
112      */

113     public McastServiceImpl(
114         McastMember member,
115         long sendFrequency,
116         long expireTime,
117         int port,
118         InetAddress JavaDoc bind,
119         InetAddress JavaDoc mcastAddress,
120         int ttl,
121         int soTimeout,
122         MembershipListener service)
123     throws IOException JavaDoc {
124         this.member = member;
125         address = mcastAddress;
126         this.port = port;
127         this.mcastSoTimeout = soTimeout;
128         this.mcastTTL = ttl;
129         this.mcastBindAddress = bind;
130         setupSocket();
131         sendPacket = new DatagramPacket JavaDoc(new byte[1000],1000);
132         sendPacket.setAddress(address);
133         sendPacket.setPort(port);
134         receivePacket = new DatagramPacket JavaDoc(new byte[1000],1000);
135         receivePacket.setAddress(address);
136         receivePacket.setPort(port);
137         membership = new McastMembership(member.getName());
138         timeToExpiration = expireTime;
139         this.service = service;
140         this.sendFrequency = sendFrequency;
141     }
142     
143     protected void setupSocket() throws IOException JavaDoc {
144         if (mcastBindAddress != null) socket = new MulticastSocket JavaDoc(new java.net.
145
JavaDoc            InetSocketAddress(mcastBindAddress, port));
146         else socket = new MulticastSocket JavaDoc(port);
147         if (mcastBindAddress != null) {
148             if(log.isInfoEnabled())
149                 log.info("Setting multihome multicast interface to:" +
150                          mcastBindAddress);
151             socket.setInterface(mcastBindAddress);
152         } //end if
153
if ( mcastSoTimeout >= 0 ) {
154             if(log.isInfoEnabled())
155                 log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
156             socket.setSoTimeout(mcastSoTimeout);
157         }
158         if ( mcastTTL >= 0 ) {
159             if(log.isInfoEnabled())
160                 log.info("Setting cluster mcast TTL to " + mcastTTL);
161             socket.setTimeToLive(mcastTTL);
162         }
163     }
164
165     /**
166      * Start the service
167      * @param level 1 starts the receiver, level 2 starts the sender
168      * @throws IOException if the service fails to start
169      * @throws IllegalStateException if the service is already started
170      */

171     public synchronized void start(int level) throws IOException JavaDoc {
172         if ( sender != null && receiver != null ) throw new IllegalStateException JavaDoc("Service already running.");
173         if ( level == 1 ) {
174             socket.joinGroup(address);
175             doRun = true;
176             receiver = new ReceiverThread();
177             receiver.setDaemon(true);
178             receiver.start();
179         }
180         if ( level==2 ) {
181             serviceStartTime = System.currentTimeMillis();
182             sender = new SenderThread(sendFrequency);
183             sender.setDaemon(true);
184             sender.start();
185             
186         }
187     }
188
189     /**
190      * Stops the service
191      * @throws IOException if the service fails to disconnect from the sockets
192      */

193     public synchronized void stop() throws IOException JavaDoc {
194         socket.leaveGroup(address);
195         doRun = false;
196         sender = null;
197         receiver = null;
198         serviceStartTime = Long.MAX_VALUE;
199     }
200
201     /**
202      * Receive a datagram packet, locking wait
203      * @throws IOException
204      */

205     public void receive() throws IOException JavaDoc {
206         socket.receive(receivePacket);
207         byte[] data = new byte[receivePacket.getLength()];
208         System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length);
209         McastMember m = McastMember.getMember(data);
210         if ( membership.memberAlive(m) ) {
211             service.memberAdded(m);
212         }
213         McastMember[] expired = membership.expire(timeToExpiration);
214         for ( int i=0; i<expired.length; i++)
215             service.memberDisappeared(expired[i]);
216     }
217
218     /**
219      * Send a ping
220      * @throws Exception
221      */

222     public void send() throws Exception JavaDoc{
223         member.inc();
224         byte[] data = member.getData(this.serviceStartTime);
225         DatagramPacket JavaDoc p = new DatagramPacket JavaDoc(data,data.length);
226         p.setAddress(address);
227         p.setPort(port);
228         socket.send(p);
229     }
230
231     public long getServiceStartTime() {
232        return this.serviceStartTime;
233     }
234
235
236     public class ReceiverThread extends Thread JavaDoc {
237         public ReceiverThread() {
238             super();
239             setName("Cluster-MembershipReceiver");
240         }
241         public void run() {
242             while ( doRun ) {
243                 try {
244                     receive();
245                 } catch ( Exception JavaDoc x ) {
246                     log.warn("Error receiving mcast package. Sleeping 500ms",x);
247                     try { Thread.sleep(500); } catch ( Exception JavaDoc ignore ){}
248                     
249                 }
250             }
251         }
252     }//class ReceiverThread
253

254     public class SenderThread extends Thread JavaDoc {
255         long time;
256         public SenderThread(long time) {
257             this.time = time;
258             setName("Cluster-MembershipSender");
259
260         }
261         public void run() {
262             while ( doRun ) {
263                 try {
264                     send();
265                 } catch ( Exception JavaDoc x ) {
266                     log.warn("Unable to send mcast message.",x);
267                 }
268                 try { Thread.sleep(time); } catch ( Exception JavaDoc ignore ) {}
269             }
270         }
271     }//class SenderThread
272
}
273
Popular Tags