1 18 package org.apache.activemq.transport.discovery.multicast; 19 20 import java.io.IOException ; 21 import java.net.DatagramPacket ; 22 import java.net.InetAddress ; 23 import java.net.InetSocketAddress ; 24 import java.net.MulticastSocket ; 25 import java.net.SocketAddress ; 26 import java.net.SocketTimeoutException ; 27 import java.net.URI ; 28 import java.util.Iterator ; 29 import java.util.Map ; 30 31 import org.apache.activemq.command.DiscoveryEvent; 32 import org.apache.activemq.transport.discovery.DiscoveryAgent; 33 import org.apache.activemq.transport.discovery.DiscoveryListener; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 37 import java.util.concurrent.ConcurrentHashMap ; 38 import java.util.concurrent.Executor ; 39 import java.util.concurrent.LinkedBlockingQueue ; 40 import java.util.concurrent.ThreadFactory ; 41 import java.util.concurrent.ThreadPoolExecutor ; 42 import java.util.concurrent.TimeUnit ; 43 import java.util.concurrent.atomic.AtomicBoolean ; 44 import java.util.concurrent.atomic.AtomicLong ; 45 51 public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable { 52 private static final Log log=LogFactory.getLog(MulticastDiscoveryAgent.class); 53 public static final String DEFAULT_DISCOVERY_URI_STRING="multicast://239.255.2.3:6155"; 54 private static final String TYPE_SUFFIX="ActiveMQ-4."; 55 private static final String ALIVE="alive."; 56 private static final String DEAD="dead."; 57 private static final String DELIMITER = "%"; 58 private static final int BUFF_SIZE=8192; 59 private static final int DEFAULT_IDLE_TIME=500; 60 private static final int HEARTBEAT_MISS_BEFORE_DEATH=4; 61 private int timeToLive=1; 62 private boolean loopBackMode=false; 63 private Map services=new ConcurrentHashMap (); 64 private Map brokers = new ConcurrentHashMap (); 65 private String group="default"; 66 private String brokerName; 67 private URI discoveryURI; 68 private InetAddress inetAddress; 69 private SocketAddress sockAddress; 70 private DiscoveryListener discoveryListener; 71 private String selfService; 72 private MulticastSocket mcast; 73 private Thread runner; 74 private long keepAliveInterval=DEFAULT_IDLE_TIME; 75 private long lastAdvertizeTime=0; 76 private AtomicBoolean started=new AtomicBoolean (false); 77 private boolean reportAdvertizeFailed=true; 78 79 private final Executor executor = new ThreadPoolExecutor (1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue (), new ThreadFactory () { 80 public Thread newThread(Runnable runable) { 81 Thread t = new Thread (runable, "Multicast Discovery Agent Notifier"); 82 t.setDaemon(true); 83 return t; 84 } 85 }); 86 87 92 public void setDiscoveryListener(DiscoveryListener listener){ 93 this.discoveryListener=listener; 94 } 95 96 99 public void registerService(String name) throws IOException { 100 this.selfService=name; 101 if (started.get()){ 102 doAdvertizeSelf(); 103 } 104 } 105 106 111 public String getGroup(){ 112 return group; 113 } 114 115 120 public void setGroup(String group){ 121 this.group=group; 122 } 123 124 127 public String getBrokerName(){ 128 return brokerName; 129 } 130 131 134 public void setBrokerName(String brokerName){ 135 if (brokerName != null){ 136 brokerName = brokerName.replace('.','-'); 137 brokerName = brokerName.replace(':','-'); 138 brokerName = brokerName.replace('%','-'); 139 this.brokerName=brokerName; 140 } 141 } 142 143 146 public boolean isLoopBackMode(){ 147 return loopBackMode; 148 } 149 150 154 public void setLoopBackMode(boolean loopBackMode){ 155 this.loopBackMode=loopBackMode; 156 } 157 158 161 public int getTimeToLive(){ 162 return timeToLive; 163 } 164 165 169 public void setTimeToLive(int timeToLive){ 170 this.timeToLive=timeToLive; 171 } 172 173 176 public URI getDiscoveryURI(){ 177 return discoveryURI; 178 } 179 180 185 public void setDiscoveryURI(URI discoveryURI){ 186 this.discoveryURI=discoveryURI; 187 } 188 189 public long getKeepAliveInterval(){ 190 return keepAliveInterval; 191 } 192 193 public void setKeepAliveInterval(long keepAliveInterval){ 194 this.keepAliveInterval=keepAliveInterval; 195 } 196 197 202 public void start() throws Exception { 203 if(started.compareAndSet(false,true)){ 204 if(group==null|| group.length()==0){ 205 throw new IOException ("You must specify a group to discover"); 206 } 207 if (brokerName == null || brokerName.length()==0){ 208 log.warn("brokerName not set"); 209 } 210 String type=getType(); 211 if(!type.endsWith(".")){ 212 log.warn("The type '"+type+"' should end with '.' to be a valid Discovery type"); 213 type+="."; 214 } 215 if(discoveryURI==null){ 216 discoveryURI=new URI (DEFAULT_DISCOVERY_URI_STRING); 217 } 218 this.inetAddress=InetAddress.getByName(discoveryURI.getHost()); 219 this.sockAddress=new InetSocketAddress (this.inetAddress,discoveryURI.getPort()); 220 mcast=new MulticastSocket (discoveryURI.getPort()); 221 mcast.setLoopbackMode(loopBackMode); 222 mcast.setTimeToLive(getTimeToLive()); 223 mcast.joinGroup(inetAddress); 224 mcast.setSoTimeout((int) keepAliveInterval); 225 runner=new Thread (this); 226 runner.setName("MulticastDiscovery: "+selfService); 227 runner.setDaemon(true); 228 runner.start(); 229 doAdvertizeSelf(); 230 } 231 } 232 233 238 public void stop() throws Exception { 239 if(started.compareAndSet(true,false)){ 240 doAdvertizeSelf(); 241 mcast.close(); 242 } 243 } 244 245 public String getType(){ 246 return group+"."+TYPE_SUFFIX; 247 } 248 249 public void run(){ 250 byte[] buf=new byte[BUFF_SIZE]; 251 DatagramPacket packet=new DatagramPacket (buf,0,buf.length); 252 while(started.get()){ 253 doTimeKeepingServices(); 254 try{ 255 mcast.receive(packet); 256 if(packet.getLength()>0){ 257 String str=new String (packet.getData(),packet.getOffset(),packet.getLength()); 258 processData(str); 259 } 260 } catch(SocketTimeoutException se){ 261 } catch(IOException e){ 263 if( started.get() ) { 264 log.error("failed to process packet: "+e); 265 } 266 } 267 } 268 } 269 270 private void processData(String str){ 271 if (discoveryListener != null){ 272 if(str.startsWith(getType())){ 273 String payload=str.substring(getType().length()); 274 if(payload.startsWith(ALIVE)){ 275 String brokerName=getBrokerName(payload.substring(ALIVE.length())); 276 String service=payload.substring(ALIVE.length()+brokerName.length()+2); 277 if(!brokerName.equals(this.brokerName)){ 278 processAlive(brokerName,service); 279 } 280 }else{ 281 String brokerName=getBrokerName(payload.substring(DEAD.length())); 282 String service=payload.substring(DEAD.length()+brokerName.length()+2); 283 if(!brokerName.equals(this.brokerName)){ 284 processDead(brokerName,service); 285 } 286 } 287 } 288 } 289 } 290 291 private void doTimeKeepingServices(){ 292 if(started.get()){ 293 long currentTime=System.currentTimeMillis(); 294 if (currentTime < lastAdvertizeTime || ((currentTime-keepAliveInterval)>lastAdvertizeTime)) { 295 doAdvertizeSelf(); 296 lastAdvertizeTime = currentTime; 297 } 298 doExpireOldServices(); 299 } 300 } 301 302 private void doAdvertizeSelf(){ 303 if(selfService!=null ){ 304 String payload=getType(); 305 payload+=started.get()?ALIVE:DEAD; 306 payload+=DELIMITER+brokerName+DELIMITER; 307 payload+=selfService; 308 try{ 309 byte[] data=payload.getBytes(); 310 DatagramPacket packet=new DatagramPacket (data,0,data.length,sockAddress); 311 mcast.send(packet); 312 } catch(IOException e) { 313 if( reportAdvertizeFailed ) { 316 reportAdvertizeFailed=false; 317 log.error("Failed to advertise our service: "+payload,e); 318 if( "Operation not permitted".equals(e.getMessage()) ) { 319 log.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. Please make sure that the OS is properly configured to allow multicast traffic over: "+mcast.getLocalAddress()); 320 } 321 } 322 } 323 } 324 } 325 326 private void processAlive(String brokerName,String service){ 327 if(selfService == null || !service.equals(selfService)){ 328 AtomicLong lastKeepAlive=(AtomicLong ) services.get(service); 329 if(lastKeepAlive==null){ 330 brokers.put(service, brokerName); 331 if(discoveryListener!=null){ 332 final DiscoveryEvent event=new DiscoveryEvent(service); 333 event.setBrokerName(brokerName); 334 335 executor.execute(new Runnable () { 339 public void run() { 340 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; 341 if(discoveryListener!=null){ 342 discoveryListener.onServiceAdd(event); 343 } 344 } 345 }); 346 } 347 lastKeepAlive=new AtomicLong (System.currentTimeMillis()); 348 services.put(service,lastKeepAlive); 349 doAdvertizeSelf(); 350 351 } 352 lastKeepAlive.set(System.currentTimeMillis()); 353 } 354 } 355 356 private void processDead(String brokerName,String service){ 357 if(!service.equals(selfService)){ 358 if(services.remove(service)!=null){ 359 brokers.remove(service); 360 if(discoveryListener!=null){ 361 final DiscoveryEvent event=new DiscoveryEvent(service); 362 event.setBrokerName(brokerName); 363 364 executor.execute(new Runnable () { 368 public void run() { 369 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; 370 if(discoveryListener!=null){ 371 discoveryListener.onServiceRemove(event); 372 } 373 } 374 }); 375 } 376 } 377 } 378 } 379 380 private void doExpireOldServices(){ 381 long expireTime=System.currentTimeMillis()-(keepAliveInterval*HEARTBEAT_MISS_BEFORE_DEATH); 382 for(Iterator i=services.entrySet().iterator();i.hasNext();){ 383 Map.Entry entry=(Map.Entry ) i.next(); 384 AtomicLong lastHeartBeat=(AtomicLong ) entry.getValue(); 385 if(lastHeartBeat.get()<expireTime){ 386 String brokerName = (String )brokers.get(entry.getKey()); 387 processDead(brokerName,entry.getKey().toString()); 388 } 389 } 390 } 391 392 private String getBrokerName(String str){ 393 String result = null; 394 int start = str.indexOf(DELIMITER); 395 if (start >= 0 ){ 396 int end = str.indexOf(DELIMITER,start+1); 397 result=str.substring(start+1, end); 398 } 399 return result; 400 } 401 402 public void serviceFailed(DiscoveryEvent event) throws IOException { 403 processDead(event.getBrokerName(), event.getServiceName()); 404 } 405 } 406 | Popular Tags |