KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > remoting > detection > AbstractDetector


1 /***************************************
2  * *
3  * JBoss: The OpenSource J2EE WebOS *
4  * *
5  * Distributable under LGPL license. *
6  * See terms of license at gnu.org. *
7  * *
8  ***************************************/

9 package org.jboss.remoting.detection;
10
11
12 import java.util.ArrayList JavaDoc;
13 import java.util.HashMap JavaDoc;
14 import java.util.HashSet JavaDoc;
15 import java.util.Iterator JavaDoc;
16 import java.util.List JavaDoc;
17 import java.util.Map JavaDoc;
18 import java.util.Set JavaDoc;
19 import java.util.Timer JavaDoc;
20 import java.util.TimerTask JavaDoc;
21 import javax.management.MBeanServer JavaDoc;
22 import javax.management.ObjectName JavaDoc;
23 import org.jboss.logging.Logger;
24 import org.jboss.remoting.ConnectionValidator;
25 import org.jboss.remoting.InvokerLocator;
26 import org.jboss.remoting.InvokerRegistry;
27 import org.jboss.remoting.ServerInvoker;
28 import org.jboss.remoting.ident.Identity;
29 import org.jboss.remoting.network.NetworkRegistryFinder;
30 import org.jboss.remoting.transport.ClientInvoker;
31 import org.w3c.dom.Element JavaDoc;
32 import org.w3c.dom.Node JavaDoc;
33 import org.w3c.dom.NodeList JavaDoc;
34
35
36 /**
37  * AbstractDetector
38  *
39  * @author <a HREF="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
40  * @author <a HREF="mailto:tom.elrod@jboss.com">Tom Elrod</a>
41  * @version $Revision: 1.7 $
42  */

43 public abstract class AbstractDetector implements Detector, AbstractDetectorMBean
44 {
45    private long defaultTimeDelay = 5000;
46    private long heartbeatTimeDelay = 1000;
47    protected final Logger log = Logger.getLogger(getClass());
48    protected MBeanServer JavaDoc mbeanserver;
49    protected ObjectName JavaDoc objectName;
50    protected ObjectName JavaDoc registryObjectName;
51
52    private Identity myself;
53    private Timer JavaDoc heartbeatTimer;
54    private Timer JavaDoc failureTimer;
55    private Map JavaDoc servers = new HashMap JavaDoc();
56    private Element JavaDoc xml;
57    private Set JavaDoc domains = new HashSet JavaDoc();
58
59    public AbstractDetector()
60    {
61    }
62
63    /**
64     * The amount of time to wait between sending (and sometimes receiving) detection messages.
65     *
66     * @param heartbeatTimeDelay
67     * @throws IllegalArgumentException
68     */

69    public void setHeartbeatTimeDelay(long heartbeatTimeDelay)
70    {
71       if(heartbeatTimeDelay > 0 && heartbeatTimeDelay < defaultTimeDelay)
72       {
73          this.heartbeatTimeDelay = heartbeatTimeDelay;
74       }
75       else
76       {
77          throw new IllegalArgumentException JavaDoc("Can not set heartbeat time delay (" + heartbeatTimeDelay + ") to a negative number or " +
78                                             "to a number greater than the default time delay (" + defaultTimeDelay + ").");
79       }
80    }
81
82    /**
83     * The amount of time to wait between sending (and sometimes receiving) detection messages.
84     *
85     * @return
86     */

87    public long getHeartbeatTimeDelay()
88    {
89       return heartbeatTimeDelay;
90    }
91
92    /**
93     * The amount of time which can elapse without receiving a detection event before a server
94     * will be suspected as being dead and peroforming an explicit invocation on it to verify it is alive.
95     *
96     * @param defaultTimeDelay time in milliseconds
97     * @throws IllegalArgumentException
98     */

99    public void setDefaultTimeDelay(long defaultTimeDelay)
100    {
101       if(defaultTimeDelay >= heartbeatTimeDelay)
102       {
103          this.defaultTimeDelay = defaultTimeDelay;
104       }
105       else
106       {
107          throw new IllegalArgumentException JavaDoc("Can not set the default time delay (" + defaultTimeDelay + ") to be less" +
108                                             " than that of the heartbeat time delay (" + heartbeatTimeDelay + ").");
109       }
110    }
111
112    /**
113     * @return The amount of time which can elapse without receiving a detection event before a server
114     * will be suspected as being dead and peroforming an explicit invocation on it to verify it is alive.
115     */

116    public long getDefaultTimeDelay()
117    {
118       return defaultTimeDelay;
119    }
120
121    /**
122     * Will create a detection message based on the server invokers registered within the local InvokerRegistry.
123     * The detection message will contain the identity and array of server invoker metadata.
124     *
125     * @return
126     */

127    public Detection createDetection()
128    {
129       Detection detection = null;
130
131       ServerInvoker invokers[] = InvokerRegistry.getServerInvokers();
132       if(invokers == null || invokers.length <= 0)
133       {
134          return detection;
135       }
136       List JavaDoc l = new ArrayList JavaDoc(invokers.length);
137       for(int c = 0; c < invokers.length; c++)
138       {
139          if(invokers[c].isStarted())
140          {
141             ServerInvokerMetadata serverInvoker = new ServerInvokerMetadata(invokers[c].getLocator(),
142                                                                             invokers[c].getSupportedSubsystems());
143             l.add(serverInvoker);
144          }
145       }
146       if(l.isEmpty())
147       {
148          return detection;
149       }
150       ServerInvokerMetadata metadata[] = (ServerInvokerMetadata[]) l.toArray(new ServerInvokerMetadata[l.size()]);
151       detection = new Detection(Identity.get(mbeanserver), metadata);
152       return detection;
153    }
154
155    /**
156     * called by MBeanServer to start the mbean lifecycle
157     *
158     * @throws Exception
159     */

160    public void start() throws Exception JavaDoc
161    {
162       // get our own identity
163
myself = Identity.get(mbeanserver);
164
165       // add my domain if domains empty and xml not set
166
if(domains.isEmpty() && xml == null)
167       {
168          domains.add(myself.getDomain());
169       }
170
171       // find our NetworkRegistry
172
registryObjectName = NetworkRegistryFinder.find(mbeanserver);
173       if(registryObjectName == null)
174       {
175          log.warn("Detector: " + getClass().getName() + " could not be loaded because the NetworkRegistry is not registered");
176          log.warn("This means that only the broadcasting of detection messages will be functional and will not be able to discover other servers.");
177       }
178
179       startPinger(getPingerDelay(), getPingerPeriod());
180       startHeartbeat(getHeartbeatDelay(), getHeartbeatPeriod());
181    }
182
183    /**
184     * return the delay in milliseconds between when the timer is created to when the first pinger thread runs.
185     * defaults to <tt>5000</tt>
186     *
187     * @return
188     */

189    protected long getPingerDelay()
190    {
191       return 5000;
192    }
193
194    /**
195     * return the period in milliseconds between checking lost servers against the last detection timestamp.
196     * defaults to <tt>1500</tt>
197     *
198     * @return
199     */

200    protected long getPingerPeriod()
201    {
202       return 1500;
203    }
204
205    /**
206     * start the pinger timer thread
207     *
208     * @param delay
209     * @param period
210     */

211    protected void startPinger(long delay, long period)
212    {
213       failureTimer = new Timer JavaDoc(false);
214       failureTimer.schedule(new FailureDetector(), delay, period);
215    }
216
217    /**
218     * stop the pinger timer thread
219     */

220    protected void stopPinger()
221    {
222       if(failureTimer != null)
223       {
224          failureTimer.cancel();
225          failureTimer = null;
226       }
227    }
228
229    /**
230     * called by the MBeanServer to stop the mbean lifecycle
231     *
232     * @throws Exception
233     */

234    public void stop() throws Exception JavaDoc
235    {
236       stopPinger();
237       stopHeartbeat();
238       stopPinger();
239    }
240
241    public void postDeregister()
242    {
243    }
244
245    public void postRegister(Boolean JavaDoc aBoolean)
246    {
247    }
248
249    public void preDeregister() throws Exception JavaDoc
250    {
251    }
252
253    public ObjectName JavaDoc preRegister(MBeanServer JavaDoc mBeanServer, ObjectName JavaDoc objectName) throws Exception JavaDoc
254    {
255       this.mbeanserver = mBeanServer;
256       this.objectName = objectName;
257       return objectName;
258    }
259
260    /**
261     * set the configuration for the domains to be recognized by detector
262     *
263     * @param xml
264     * @jmx.managed-attribute description="Configuration is an xml element indicating domains to be recognized by detector"
265     * access="read-write"
266     */

267    public void setConfiguration(Element JavaDoc xml)
268          throws Exception JavaDoc
269    {
270       this.xml = xml;
271
272       // check configuration xml
273
if(xml != null)
274       {
275          // clearing collection of domains since have new ones to set
276
domains.clear();
277
278          NodeList JavaDoc domainNodes = xml.getElementsByTagName("domain");
279          if(domainNodes == null || domainNodes.getLength() <= 0)
280          {
281             // no domains specified, so will accept all domains
282
log.debug("No domains specified. Will accept all domains.");
283          }
284          int len = domainNodes.getLength();
285          for(int c = 0; c < len; c++)
286          {
287             Node JavaDoc node = domainNodes.item(c);
288             String JavaDoc domain = node.getFirstChild().getNodeValue();
289             domains.add(domain);
290             log.debug("Added domain " + domain + " to detector list.");
291          }
292       }
293    }
294
295    /**
296     * The <code>getConfiguration</code> method
297     *
298     * @return an <code>Element</code> value
299     * @jmx.managed-attribute
300     */

301    public Element JavaDoc getConfiguration()
302    {
303       return xml;
304    }
305
306    //----------------------- protected
307

308    /**
309     * start heartbeating
310     *
311     * @param delay
312     * @param period
313     */

314    protected void startHeartbeat(long delay, long period)
315    {
316       if(heartbeatTimer == null)
317       {
318          heartbeatTimer = new Timer JavaDoc(false);
319       }
320       heartbeatTimer.schedule(new Heartbeat(), delay, period);
321    }
322
323    /**
324     * stop heartbeating
325     */

326    protected void stopHeartbeat()
327    {
328       if(heartbeatTimer != null)
329       {
330          try
331          {
332             heartbeatTimer.cancel();
333          }
334          catch(Exception JavaDoc eg)
335          {
336          }
337          heartbeatTimer = null;
338       }
339    }
340
341    /**
342     * return the initial delay in milliseconds before the initial heartbeat is fired.
343     * Defaults to <tt>0</tt>
344     *
345     * @return
346     */

347    protected long getHeartbeatDelay()
348    {
349       return 0;
350    }
351
352    /**
353     * return the period in milliseconds between subsequent heartbeats. Defaults to
354     * <tt>1000</tt>
355     *
356     * @return
357     */

358    protected long getHeartbeatPeriod()
359    {
360       return heartbeatTimeDelay;
361    }
362
363    /**
364     * subclasses must implement to provide the specific heartbeat protocol
365     * for this server to send out to other servers on the network
366     */

367    protected abstract void heartbeat();
368
369    /**
370     * called when a remote detection from a peer is received by a detector
371     *
372     * @param detection
373     */

374    protected void detect(Detection detection)
375    {
376       if(log.isTraceEnabled())
377       {
378          log.trace("Detection message received.");
379          log.trace("Id = " + detection.getIdentity().getInstanceId());
380          log.trace("isRemoteDetection() = " + isRemoteDetection(detection));
381       }
382       // we only track detections within our own domain and not ourself
383
if(isRemoteDetection(detection))
384       {
385          try
386          {
387             boolean found = false;
388             Server JavaDoc server = null;
389
390             synchronized(servers)
391             {
392                server = (Server JavaDoc) servers.get(detection);
393                found = server != null;
394                if(!found)
395                {
396                   // update either way the timestamp and the detection
397
servers.put(detection, (server = new Server JavaDoc(detection)));
398                }
399                else
400                {
401                   server.lastDetection = System.currentTimeMillis();
402                }
403             }
404             if(found == false)
405             {
406                if(registryObjectName != null)
407                {
408                   log.debug("detected NEW server: " + detection.getIdentity());
409                   mbeanserver.invoke(registryObjectName, "addServer", new Object JavaDoc[]{detection.getIdentity(),
410                                                                                    detection.getServerInvokers()},
411                                      new String JavaDoc[]{Identity.class.getName(), ServerInvokerMetadata[].class.getName()});
412                }
413             }
414             else
415             {
416                if(server.changed(detection))
417                {
418                   // update hash
419
server.rehash(detection);
420                   if(registryObjectName != null)
421                   {
422                      if(log.isTraceEnabled())
423                      {
424                         log.trace("detected UPDATE for server: " + detection.getIdentity());
425                      }
426                      mbeanserver.invoke(registryObjectName, "updateServer", new Object JavaDoc[]{detection.getIdentity(),
427                                                                                          detection.getServerInvokers()},
428                                         new String JavaDoc[]{Identity.class.getName(), ServerInvokerMetadata[].class.getName()});
429                   }
430                }
431             }
432          }
433          catch(javax.management.InstanceNotFoundException JavaDoc inf)
434          {
435             return;
436          }
437          catch(Exception JavaDoc e)
438          {
439             log.error("Error during detection of: " + detection, e);
440          }
441       }
442       else if(log.isTraceEnabled())
443       {
444          log.trace("detection from myself - ignored");
445       }
446    }
447
448    protected boolean isRemoteDetection(Detection detection)
449    {
450       String JavaDoc domain = detection.getIdentity().getDomain();
451       // is detection domain in accepted domain collection and not local
452
// if domains empty, then accept all
453
return (domains.isEmpty() || domains.contains(domain)) &&
454              myself.isSameJVM(detection.getIdentity()) == false;
455    }
456
457    protected boolean checkInvokerServer(Detection detection, ClassLoader JavaDoc cl)
458    {
459       boolean ok = false;
460       InvokerLocator il[] = detection.getLocators();
461       for(int c = 0; c < il.length; c++)
462       {
463          try
464          {
465             ClientInvoker ci = InvokerRegistry.createClientInvoker(il[c]);
466
467             if(ci.isConnected() == false)
468             {
469                ci.connect();
470             }
471
472             boolean isValid = ConnectionValidator.checkConnection(ci);
473             if(isValid)
474             {
475                // the transport was successful
476
ok = true;
477                break;
478             }
479
480          }
481          catch(Throwable JavaDoc ig)
482          {
483             log.debug("failed calling ping on " + detection, ig);
484             // remove the client invoker, it's not any good anymore
485
InvokerRegistry.destroyClientInvoker(il[c]);
486             // will be aggressive with removal. if any fail, remove it.
487
// if still good, will pick up detection again next go around.
488
break;
489          }
490       }
491       if(ok == false)
492       {
493          // the server is down!
494
try
495          {
496             if(registryObjectName != null)
497             {
498                mbeanserver.invoke(registryObjectName, "removeServer", new Object JavaDoc[]{detection.getIdentity()},
499                                   new String JavaDoc[]{Identity.class.getName()});
500                log.debug("Removed detection " + detection);
501             }
502          }
503          catch(Exception JavaDoc ex)
504          {
505             log.warn("Error removing server", ex);
506          }
507          finally
508          {
509             // remove this server, it isn't available any more
510
servers.remove(detection);
511          }
512       }
513
514       return ok;
515    }
516
517
518    private final class FailureDetector extends TimerTask JavaDoc
519    {
520       public void run()
521       {
522          if(servers.isEmpty())
523          {
524             return;
525          }
526          // make a copy so we don't have to block incoming
527
// notifications during failure check
528
Map JavaDoc map = null;
529          synchronized(servers)
530          {
531             map = new HashMap JavaDoc(servers);
532          }
533          ClassLoader JavaDoc cl = AbstractDetector.this.getClass().getClassLoader();
534          // walk through each detection and see if it needs checking up on ...
535
Iterator JavaDoc iter = map.keySet().iterator();
536          while(iter.hasNext())
537          {
538             Detection detection = (Detection) iter.next();
539             long lastDetection = 0;
540             Server JavaDoc server = null;
541             synchronized(servers)
542             {
543                server = (Server JavaDoc) map.get(detection);
544                lastDetection = server.lastDetection;
545             }
546             long duration = System.currentTimeMillis() - lastDetection;
547             if(duration >= defaultTimeDelay)
548             {
549                if(log.isTraceEnabled())
550                {
551                   log.trace("detection for: " + detection + " has not been received in: " + defaultTimeDelay + " ms, contacting..");
552                }
553                // OK, we've exceeded the time delay since the last time we've detected
554
// this dude, he might be down, let's walk through each of his transports and
555
// see if any of them lead to a valid invocation
556
if(checkInvokerServer(detection, cl))
557                {
558                   if(log.isTraceEnabled())
559                   {
560                      log.trace("detection for: " + detection + " recovered on ping");
561                   }
562                   server.lastDetection = System.currentTimeMillis();
563                }
564             }
565          }
566       }
567
568    }
569
570    private final class Server
571    {
572       Detection detection;
573       private int hashCode = 0;
574       long lastDetection = System.currentTimeMillis();
575
576       Server(Detection detection)
577       {
578          rehash(detection);
579       }
580
581       private void rehash(Detection d)
582       {
583          this.hashCode = hash(d);
584       }
585
586       private int hash(Detection d)
587       {
588          int hc = 0;
589          InvokerLocator locators[] = d.getLocators();
590          if(locators != null)
591          {
592             for(int c = 0; c < locators.length; c++)
593             {
594                hc += locators[c].hashCode();
595             }
596          }
597          return hc;
598       }
599
600       boolean changed(Detection detection)
601       {
602          return hashCode != hash(detection);
603       }
604
605       public boolean equals(Object JavaDoc obj)
606       {
607          return obj instanceof Server JavaDoc && hashCode == obj.hashCode();
608       }
609
610       public int hashCode()
611       {
612          return hashCode;
613       }
614    }
615
616    private final class Heartbeat extends TimerTask JavaDoc
617    {
618       public void run()
619       {
620          InvokerLocator il[] = InvokerRegistry.getRegisteredServerLocators();
621          if(il != null && il.length > 0)
622          {
623             // we only heartbeat if we have connectors and the ability for a
624
// client to reach us back, otherwise its sort of a mute point ..
625
heartbeat();
626          }
627       }
628    }
629 }
630
Popular Tags