1 16 17 package org.apache.catalina.cluster.mcast; 18 19 30 31 import java.net.MulticastSocket ; 32 import java.io.IOException ; 33 import java.net.InetAddress ; 34 import java.net.DatagramPacket ; 35 import org.apache.catalina.cluster.MembershipListener; 36 public class McastServiceImpl 37 { 38 private static org.apache.commons.logging.Log log = 39 org.apache.commons.logging.LogFactory.getLog( McastService.class ); 40 43 protected boolean doRun = false; 44 47 protected MulticastSocket socket; 48 51 protected McastMember member; 52 55 protected InetAddress address; 56 59 protected int port; 60 63 protected long timeToExpiration; 64 67 protected long sendFrequency; 68 71 protected DatagramPacket sendPacket; 72 75 protected DatagramPacket receivePacket; 76 79 protected McastMembership membership; 80 83 protected MembershipListener service; 84 87 protected ReceiverThread receiver; 88 91 protected SenderThread sender; 92 93 96 protected long serviceStartTime = System.currentTimeMillis(); 97 98 protected int mcastTTL = -1; 99 protected int mcastSoTimeout = -1; 100 protected InetAddress mcastBindAddress = null; 101 102 113 public McastServiceImpl( 114 McastMember member, 115 long sendFrequency, 116 long expireTime, 117 int port, 118 InetAddress bind, 119 InetAddress mcastAddress, 120 int ttl, 121 int soTimeout, 122 MembershipListener service) 123 throws IOException { 124 this.member = member; 125 address = mcastAddress; 126 this.port = port; 127 this.mcastSoTimeout = soTimeout; 128 this.mcastTTL = ttl; 129 this.mcastBindAddress = bind; 130 setupSocket(); 131 sendPacket = new DatagramPacket (new byte[1000],1000); 132 sendPacket.setAddress(address); 133 sendPacket.setPort(port); 134 receivePacket = new DatagramPacket (new byte[1000],1000); 135 receivePacket.setAddress(address); 136 receivePacket.setPort(port); 137 membership = new McastMembership(member.getName()); 138 timeToExpiration = expireTime; 139 this.service = service; 140 this.sendFrequency = sendFrequency; 141 } 142 143 protected void setupSocket() throws IOException { 144 if (mcastBindAddress != null) socket = new MulticastSocket (new java.net. 145 InetSocketAddress(mcastBindAddress, port)); 146 else socket = new MulticastSocket (port); 147 if (mcastBindAddress != null) { 148 if(log.isInfoEnabled()) 149 log.info("Setting multihome multicast interface to:" + 150 mcastBindAddress); 151 socket.setInterface(mcastBindAddress); 152 } if ( mcastSoTimeout >= 0 ) { 154 if(log.isInfoEnabled()) 155 log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout); 156 socket.setSoTimeout(mcastSoTimeout); 157 } 158 if ( mcastTTL >= 0 ) { 159 if(log.isInfoEnabled()) 160 log.info("Setting cluster mcast TTL to " + mcastTTL); 161 socket.setTimeToLive(mcastTTL); 162 } 163 } 164 165 171 public synchronized void start(int level) throws IOException { 172 if ( sender != null && receiver != null ) throw new IllegalStateException ("Service already running."); 173 if ( level == 1 ) { 174 socket.joinGroup(address); 175 doRun = true; 176 receiver = new ReceiverThread(); 177 receiver.setDaemon(true); 178 receiver.start(); 179 } 180 if ( level==2 ) { 181 serviceStartTime = System.currentTimeMillis(); 182 sender = new SenderThread(sendFrequency); 183 sender.setDaemon(true); 184 sender.start(); 185 186 } 187 } 188 189 193 public synchronized void stop() throws IOException { 194 socket.leaveGroup(address); 195 doRun = false; 196 sender = null; 197 receiver = null; 198 serviceStartTime = Long.MAX_VALUE; 199 } 200 201 205 public void receive() throws IOException { 206 socket.receive(receivePacket); 207 byte[] data = new byte[receivePacket.getLength()]; 208 System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length); 209 McastMember m = McastMember.getMember(data); 210 if ( membership.memberAlive(m) ) { 211 service.memberAdded(m); 212 } 213 McastMember[] expired = membership.expire(timeToExpiration); 214 for ( int i=0; i<expired.length; i++) 215 service.memberDisappeared(expired[i]); 216 } 217 218 222 public void send() throws Exception { 223 member.inc(); 224 byte[] data = member.getData(this.serviceStartTime); 225 DatagramPacket p = new DatagramPacket (data,data.length); 226 p.setAddress(address); 227 p.setPort(port); 228 socket.send(p); 229 } 230 231 public long getServiceStartTime() { 232 return this.serviceStartTime; 233 } 234 235 236 public class ReceiverThread extends Thread { 237 public ReceiverThread() { 238 super(); 239 setName("Cluster-MembershipReceiver"); 240 } 241 public void run() { 242 while ( doRun ) { 243 try { 244 receive(); 245 } catch ( Exception x ) { 246 log.warn("Error receiving mcast package. Sleeping 500ms",x); 247 try { Thread.sleep(500); } catch ( Exception ignore ){} 248 249 } 250 } 251 } 252 } 254 public class SenderThread extends Thread { 255 long time; 256 public SenderThread(long time) { 257 this.time = time; 258 setName("Cluster-MembershipSender"); 259 260 } 261 public void run() { 262 while ( doRun ) { 263 try { 264 send(); 265 } catch ( Exception x ) { 266 log.warn("Unable to send mcast message.",x); 267 } 268 try { Thread.sleep(time); } catch ( Exception ignore ) {} 269 } 270 } 271 }} 273 | Popular Tags |