KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > discovery > multicast > MulticastDiscoveryAgent


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.discovery.multicast;
19
20 import java.io.IOException JavaDoc;
21 import java.net.DatagramPacket JavaDoc;
22 import java.net.InetAddress JavaDoc;
23 import java.net.InetSocketAddress JavaDoc;
24 import java.net.MulticastSocket JavaDoc;
25 import java.net.SocketAddress JavaDoc;
26 import java.net.SocketTimeoutException JavaDoc;
27 import java.net.URI JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.Map JavaDoc;
30
31 import org.apache.activemq.command.DiscoveryEvent;
32 import org.apache.activemq.transport.discovery.DiscoveryAgent;
33 import org.apache.activemq.transport.discovery.DiscoveryListener;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36
37 import java.util.concurrent.ConcurrentHashMap JavaDoc;
38 import java.util.concurrent.Executor JavaDoc;
39 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
40 import java.util.concurrent.ThreadFactory JavaDoc;
41 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
42 import java.util.concurrent.TimeUnit JavaDoc;
43 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
44 import java.util.concurrent.atomic.AtomicLong JavaDoc;
45 /**
46  * A {@link DiscoveryAgent} using a multicast address and heartbeat packets encoded using any
47  * wireformat, but openwire by default.
48  *
49  * @version $Revision$
50  */

51 public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable JavaDoc{
52     private static final Log log=LogFactory.getLog(MulticastDiscoveryAgent.class);
53     public static final String JavaDoc DEFAULT_DISCOVERY_URI_STRING="multicast://239.255.2.3:6155";
54     private static final String JavaDoc TYPE_SUFFIX="ActiveMQ-4.";
55     private static final String JavaDoc ALIVE="alive.";
56     private static final String JavaDoc DEAD="dead.";
57     private static final String JavaDoc DELIMITER = "%";
58     private static final int BUFF_SIZE=8192;
59     private static final int DEFAULT_IDLE_TIME=500;
60     private static final int HEARTBEAT_MISS_BEFORE_DEATH=4;
61     private int timeToLive=1;
62     private boolean loopBackMode=false;
63     private Map JavaDoc services=new ConcurrentHashMap JavaDoc();
64     private Map JavaDoc brokers = new ConcurrentHashMap JavaDoc();
65     private String JavaDoc group="default";
66     private String JavaDoc brokerName;
67     private URI JavaDoc discoveryURI;
68     private InetAddress JavaDoc inetAddress;
69     private SocketAddress JavaDoc sockAddress;
70     private DiscoveryListener discoveryListener;
71     private String JavaDoc selfService;
72     private MulticastSocket JavaDoc mcast;
73     private Thread JavaDoc runner;
74     private long keepAliveInterval=DEFAULT_IDLE_TIME;
75     private long lastAdvertizeTime=0;
76     private AtomicBoolean JavaDoc started=new AtomicBoolean JavaDoc(false);
77     private boolean reportAdvertizeFailed=true;
78     
79     private final Executor JavaDoc executor = new ThreadPoolExecutor JavaDoc(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue JavaDoc(), new ThreadFactory JavaDoc() {
80         public Thread JavaDoc newThread(Runnable JavaDoc runable) {
81             Thread JavaDoc t = new Thread JavaDoc(runable, "Multicast Discovery Agent Notifier");
82             t.setDaemon(true);
83             return t;
84         }
85     });
86
87     /**
88      * Set the discovery listener
89      *
90      * @param listener
91      */

92     public void setDiscoveryListener(DiscoveryListener listener){
93         this.discoveryListener=listener;
94     }
95
96     /**
97      * register a service
98      */

99     public void registerService(String JavaDoc name) throws IOException JavaDoc{
100         this.selfService=name;
101         if (started.get()){
102             doAdvertizeSelf();
103         }
104     }
105
106     /**
107      * Get the group used for discovery
108      *
109      * @return the group
110      */

111     public String JavaDoc getGroup(){
112         return group;
113     }
114
115     /**
116      * Set the group for discovery
117      *
118      * @param group
119      */

120     public void setGroup(String JavaDoc group){
121         this.group=group;
122     }
123
124     /**
125      * @return Returns the brokerName.
126      */

127     public String JavaDoc getBrokerName(){
128         return brokerName;
129     }
130
131     /**
132      * @param brokerName The brokerName to set.
133      */

134     public void setBrokerName(String JavaDoc brokerName){
135         if (brokerName != null){
136             brokerName = brokerName.replace('.','-');
137             brokerName = brokerName.replace(':','-');
138             brokerName = brokerName.replace('%','-');
139         this.brokerName=brokerName;
140         }
141     }
142
143     /**
144      * @return Returns the loopBackMode.
145      */

146     public boolean isLoopBackMode(){
147         return loopBackMode;
148     }
149
150     /**
151      * @param loopBackMode
152      * The loopBackMode to set.
153      */

154     public void setLoopBackMode(boolean loopBackMode){
155         this.loopBackMode=loopBackMode;
156     }
157
158     /**
159      * @return Returns the timeToLive.
160      */

161     public int getTimeToLive(){
162         return timeToLive;
163     }
164
165     /**
166      * @param timeToLive
167      * The timeToLive to set.
168      */

169     public void setTimeToLive(int timeToLive){
170         this.timeToLive=timeToLive;
171     }
172
173     /**
174      * @return the discoveryURI
175      */

176     public URI JavaDoc getDiscoveryURI(){
177         return discoveryURI;
178     }
179
180     /**
181      * Set the discoveryURI
182      *
183      * @param discoveryURI
184      */

185     public void setDiscoveryURI(URI JavaDoc discoveryURI){
186         this.discoveryURI=discoveryURI;
187     }
188
189     public long getKeepAliveInterval(){
190         return keepAliveInterval;
191     }
192
193     public void setKeepAliveInterval(long keepAliveInterval){
194         this.keepAliveInterval=keepAliveInterval;
195     }
196
197     /**
198      * start the discovery agent
199      *
200      * @throws Exception
201      */

202     public void start() throws Exception JavaDoc{
203         if(started.compareAndSet(false,true)){
204             if(group==null|| group.length()==0){
205                 throw new IOException JavaDoc("You must specify a group to discover");
206             }
207             if (brokerName == null || brokerName.length()==0){
208                 log.warn("brokerName not set");
209             }
210             String JavaDoc type=getType();
211             if(!type.endsWith(".")){
212                 log.warn("The type '"+type+"' should end with '.' to be a valid Discovery type");
213                 type+=".";
214             }
215             if(discoveryURI==null){
216                 discoveryURI=new URI JavaDoc(DEFAULT_DISCOVERY_URI_STRING);
217             }
218             this.inetAddress=InetAddress.getByName(discoveryURI.getHost());
219             this.sockAddress=new InetSocketAddress JavaDoc(this.inetAddress,discoveryURI.getPort());
220             mcast=new MulticastSocket JavaDoc(discoveryURI.getPort());
221             mcast.setLoopbackMode(loopBackMode);
222             mcast.setTimeToLive(getTimeToLive());
223             mcast.joinGroup(inetAddress);
224             mcast.setSoTimeout((int) keepAliveInterval);
225             runner=new Thread JavaDoc(this);
226             runner.setName("MulticastDiscovery: "+selfService);
227             runner.setDaemon(true);
228             runner.start();
229             doAdvertizeSelf();
230         }
231     }
232
233     /**
234      * stop the channel
235      *
236      * @throws Exception
237      */

238     public void stop() throws Exception JavaDoc{
239         if(started.compareAndSet(true,false)){
240             doAdvertizeSelf();
241             mcast.close();
242         }
243     }
244
245     public String JavaDoc getType(){
246         return group+"."+TYPE_SUFFIX;
247     }
248
249     public void run(){
250         byte[] buf=new byte[BUFF_SIZE];
251         DatagramPacket JavaDoc packet=new DatagramPacket JavaDoc(buf,0,buf.length);
252         while(started.get()){
253             doTimeKeepingServices();
254             try{
255                 mcast.receive(packet);
256                 if(packet.getLength()>0){
257                     String JavaDoc str=new String JavaDoc(packet.getData(),packet.getOffset(),packet.getLength());
258                     processData(str);
259                 }
260             } catch(SocketTimeoutException JavaDoc se){
261                 // ignore
262
} catch(IOException JavaDoc e){
263                 if( started.get() ) {
264                     log.error("failed to process packet: "+e);
265                 }
266             }
267         }
268     }
269
270     private void processData(String JavaDoc str){
271         if (discoveryListener != null){
272         if(str.startsWith(getType())){
273             String JavaDoc payload=str.substring(getType().length());
274             if(payload.startsWith(ALIVE)){
275                 String JavaDoc brokerName=getBrokerName(payload.substring(ALIVE.length()));
276                 String JavaDoc service=payload.substring(ALIVE.length()+brokerName.length()+2);
277                 if(!brokerName.equals(this.brokerName)){
278                     processAlive(brokerName,service);
279                 }
280             }else{
281                 String JavaDoc brokerName=getBrokerName(payload.substring(DEAD.length()));
282                 String JavaDoc service=payload.substring(DEAD.length()+brokerName.length()+2);
283                 if(!brokerName.equals(this.brokerName)){
284                     processDead(brokerName,service);
285                 }
286             }
287         }
288         }
289     }
290
291     private void doTimeKeepingServices(){
292         if(started.get()){
293             long currentTime=System.currentTimeMillis();
294             if (currentTime < lastAdvertizeTime || ((currentTime-keepAliveInterval)>lastAdvertizeTime)) {
295                 doAdvertizeSelf();
296                 lastAdvertizeTime = currentTime;
297             }
298             doExpireOldServices();
299         }
300     }
301
302     private void doAdvertizeSelf(){
303         if(selfService!=null ){
304             String JavaDoc payload=getType();
305             payload+=started.get()?ALIVE:DEAD;
306             payload+=DELIMITER+brokerName+DELIMITER;
307             payload+=selfService;
308             try{
309                 byte[] data=payload.getBytes();
310                 DatagramPacket JavaDoc packet=new DatagramPacket JavaDoc(data,0,data.length,sockAddress);
311                 mcast.send(packet);
312             } catch(IOException JavaDoc e) {
313                 // If a send fails, chances are all subsequent sends will fail too.. No need to keep reporting the
314
// same error over and over.
315
if( reportAdvertizeFailed ) {
316                     reportAdvertizeFailed=false;
317                     log.error("Failed to advertise our service: "+payload,e);
318                     if( "Operation not permitted".equals(e.getMessage()) ) {
319                         log.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. Please make sure that the OS is properly configured to allow multicast traffic over: "+mcast.getLocalAddress());
320                     }
321                 }
322             }
323         }
324     }
325
326     private void processAlive(String JavaDoc brokerName,String JavaDoc service){
327         if(selfService == null || !service.equals(selfService)){
328             AtomicLong JavaDoc lastKeepAlive=(AtomicLong JavaDoc) services.get(service);
329             if(lastKeepAlive==null){
330                 brokers.put(service, brokerName);
331                 if(discoveryListener!=null){
332                     final DiscoveryEvent event=new DiscoveryEvent(service);
333                     event.setBrokerName(brokerName);
334                     
335                     // Have the listener process the event async so that
336
// he does not block this thread since we are doing time sensitive
337
// processing of events.
338
executor.execute(new Runnable JavaDoc() {
339                         public void run() {
340                             DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
341                             if(discoveryListener!=null){
342                                 discoveryListener.onServiceAdd(event);
343                             }
344                         }
345                     });
346                 }
347                 lastKeepAlive=new AtomicLong JavaDoc(System.currentTimeMillis());
348                 services.put(service,lastKeepAlive);
349                 doAdvertizeSelf();
350                 
351             }
352             lastKeepAlive.set(System.currentTimeMillis());
353         }
354     }
355
356     private void processDead(String JavaDoc brokerName,String JavaDoc service){
357         if(!service.equals(selfService)){
358             if(services.remove(service)!=null){
359                 brokers.remove(service);
360                 if(discoveryListener!=null){
361                     final DiscoveryEvent event=new DiscoveryEvent(service);
362                     event.setBrokerName(brokerName);
363                     
364                     // Have the listener process the event async so that
365
// he does not block this thread since we are doing time sensitive
366
// processing of events.
367
executor.execute(new Runnable JavaDoc() {
368                         public void run() {
369                             DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
370                             if(discoveryListener!=null){
371                                 discoveryListener.onServiceRemove(event);
372                             }
373                         }
374                     });
375                 }
376             }
377         }
378     }
379
380     private void doExpireOldServices(){
381         long expireTime=System.currentTimeMillis()-(keepAliveInterval*HEARTBEAT_MISS_BEFORE_DEATH);
382         for(Iterator JavaDoc i=services.entrySet().iterator();i.hasNext();){
383             Map.Entry JavaDoc entry=(Map.Entry JavaDoc) i.next();
384             AtomicLong JavaDoc lastHeartBeat=(AtomicLong JavaDoc) entry.getValue();
385             if(lastHeartBeat.get()<expireTime){
386                 String JavaDoc brokerName = (String JavaDoc)brokers.get(entry.getKey());
387                 processDead(brokerName,entry.getKey().toString());
388             }
389         }
390     }
391     
392     private String JavaDoc getBrokerName(String JavaDoc str){
393         String JavaDoc result = null;
394         int start = str.indexOf(DELIMITER);
395         if (start >= 0 ){
396             int end = str.indexOf(DELIMITER,start+1);
397             result=str.substring(start+1, end);
398         }
399         return result;
400     }
401
402     public void serviceFailed(DiscoveryEvent event) throws IOException JavaDoc {
403         processDead(event.getBrokerName(), event.getServiceName());
404     }
405 }
406
Popular Tags