KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > mx4j > remote > AbstractRemoteNotificationClientHandler


1 /*
2  * Copyright (C) The MX4J Contributors.
3  * All rights reserved.
4  *
5  * This software is distributed under the terms of the MX4J License version 1.0.
6  * See the terms of the MX4J License in the documentation provided with this software.
7  */

8
9 package mx4j.remote;
10
11 import java.io.IOException JavaDoc;
12 import java.util.ArrayList JavaDoc;
13 import java.util.Arrays JavaDoc;
14 import java.util.HashMap JavaDoc;
15 import java.util.Iterator JavaDoc;
16 import java.util.LinkedList JavaDoc;
17 import java.util.List JavaDoc;
18 import java.util.Map JavaDoc;
19 import javax.management.Notification JavaDoc;
20 import javax.management.NotificationFilter JavaDoc;
21 import javax.management.NotificationListener JavaDoc;
22 import javax.management.remote.NotificationResult JavaDoc;
23 import javax.management.remote.TargetedNotification JavaDoc;
24
25 import mx4j.log.Log;
26 import mx4j.log.Logger;
27
28 /**
29  * Base implementation of the RemoteNotificationClientHandler interface.
30  *
31  * @version $Revision: 1.7 $
32  */

33 public abstract class AbstractRemoteNotificationClientHandler implements RemoteNotificationClientHandler
34 {
35    private static int fetcherID;
36    private static int delivererID;
37
38    private final ConnectionNotificationEmitter emitter;
39    private final HeartBeat heartbeat;
40    private final Map JavaDoc tuples = new HashMap JavaDoc();
41    private NotificationFetcherThread fetcherThread;
42    private NotificationDelivererThread delivererThread;
43
44    /**
45     * Creates a new remote notification client-side handler.
46     * It uses an emitter, an heartbeat and an environment to perform the job.
47     * All 3 can be null, but the corrispondent methods must be overridden
48     *
49     * @param emitter The NotificationEmitter that emits connection failures notifications
50     * @param heartbeat The heart beat is used to get the retry parameters in case of connection failure
51     * @param environment Contains environment variables used to configure this handler
52     * @see MX4JRemoteConstants#FETCH_NOTIFICATIONS_MAX_NUMBER
53     * @see MX4JRemoteConstants#FETCH_NOTIFICATIONS_SLEEP
54     * @see MX4JRemoteConstants#FETCH_NOTIFICATIONS_TIMEOUT
55     * @see #sendConnectionNotificationLost
56     * @see #getMaxRetries
57     * @see #getRetryPeriod
58     */

59    protected AbstractRemoteNotificationClientHandler(ConnectionNotificationEmitter emitter, HeartBeat heartbeat, Map JavaDoc environment)
60    {
61       this.emitter = emitter;
62       this.heartbeat = heartbeat;
63       this.fetcherThread = new NotificationFetcherThread(environment);
64       this.delivererThread = new NotificationDelivererThread(environment);
65    }
66
67    /**
68     * Returns whether this client handler is fetching notifications or not.
69     *
70     * @see #start
71     * @see #stop
72     */

73    public boolean isActive()
74    {
75       return fetcherThread.isActive();
76    }
77
78    public void start()
79    {
80       if (isActive()) return;
81       delivererThread.start();
82       fetcherThread.start();
83    }
84
85    public void stop()
86    {
87       if (!isActive()) return;
88       fetcherThread.stop();
89       delivererThread.stop();
90       synchronized (tuples)
91       {
92          tuples.clear();
93       }
94    }
95
96    private synchronized static int getFetcherID()
97    {
98       return ++fetcherID;
99    }
100
101    private synchronized static int getDelivererID()
102    {
103       return ++delivererID;
104    }
105
106    public boolean contains(NotificationTuple tuple)
107    {
108       synchronized (tuples)
109       {
110          return tuples.containsValue(tuple);
111       }
112    }
113
114    public void addNotificationListener(Integer JavaDoc id, NotificationTuple tuple)
115    {
116       if (!isActive()) start();
117
118       synchronized (tuples)
119       {
120          tuples.put(id, tuple);
121       }
122
123       Logger logger = getLogger();
124       if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Adding remote NotificationListener " + tuple);
125    }
126
127    public Integer JavaDoc[] getNotificationListeners(NotificationTuple tuple)
128    {
129       synchronized (tuples)
130       {
131          ArrayList JavaDoc ids = new ArrayList JavaDoc();
132          for (Iterator JavaDoc i = tuples.entrySet().iterator(); i.hasNext();)
133          {
134             Map.Entry JavaDoc entry = (Map.Entry JavaDoc)i.next();
135             if (entry.getValue().equals(tuple)) ids.add(entry.getKey());
136          }
137          if (ids.size() > 0) return (Integer JavaDoc[])ids.toArray(new Integer JavaDoc[ids.size()]);
138       }
139       return null;
140    }
141
142    public Integer JavaDoc getNotificationListener(NotificationTuple tuple)
143    {
144       synchronized (tuples)
145       {
146          for (Iterator JavaDoc i = tuples.entrySet().iterator(); i.hasNext();)
147          {
148             Map.Entry JavaDoc entry = (Map.Entry JavaDoc)i.next();
149             if (entry.getValue().equals(tuple)) return (Integer JavaDoc)entry.getKey();
150          }
151       }
152       return null;
153    }
154
155    public void removeNotificationListeners(Integer JavaDoc[] ids)
156    {
157       Logger logger = getLogger();
158       synchronized (tuples)
159       {
160          for (int i = 0; i < ids.length; ++i)
161          {
162             Integer JavaDoc id = ids[i];
163             NotificationTuple tuple = (NotificationTuple)tuples.remove(id);
164             if (tuple != null && logger.isEnabledFor(Logger.DEBUG)) logger.debug("Removing remote NotificationListener " + tuple);
165          }
166       }
167    }
168
169    /**
170     * Calls the server side to fetch notifications.
171     */

172    protected abstract NotificationResult JavaDoc fetchNotifications(long sequence, int maxNumber, long timeout) throws IOException JavaDoc;
173
174    /**
175     * Returns the period between two retries if the connection with the server side fails.
176     * This implementation returns the heartbeat pulse period, but can be overridden.
177     *
178     * @see #getMaxRetries
179     * @see #AbstractRemoteNotificationClientHandler
180     */

181    protected long getRetryPeriod()
182    {
183       return heartbeat.getPulsePeriod();
184    }
185
186    /**
187     * Returns the maximum number of attempts that should be made before declaring a connection
188     * failed.
189     * This implementation returns the heartbeat max retries, but can be overridden.
190     *
191     * @see #getRetryPeriod
192     * @see #AbstractRemoteNotificationClientHandler
193     */

194    protected int getMaxRetries()
195    {
196       return heartbeat.getMaxRetries();
197    }
198
199    /**
200     * Sends the {@link javax.management.remote.JMXConnectionNotification#NOTIFS_LOST} notification
201     * using the emitter passed to {@link AbstractRemoteNotificationClientHandler}
202     */

203    protected void sendConnectionNotificationLost(long number)
204    {
205       emitter.sendConnectionNotificationLost(number);
206    }
207
208    protected int getNotificationsCount()
209    {
210       return delivererThread.getNotificationsCount();
211    }
212
213    private int deliverNotifications(TargetedNotification JavaDoc[] notifications)
214    {
215       return delivererThread.addNotifications(notifications);
216    }
217
218    private void sendNotification(TargetedNotification JavaDoc notification)
219    {
220       NotificationTuple tuple = null;
221       synchronized (tuples)
222       {
223          tuple = (NotificationTuple)tuples.get(notification.getListenerID());
224       }
225
226       // It may be possible that a notification arrived after the client already removed the listener
227
if (tuple == null) return;
228
229       Notification JavaDoc notif = notification.getNotification();
230
231       Logger logger = getLogger();
232
233       if (tuple.getInvokeFilter())
234       {
235          // Invoke the filter on client side
236
NotificationFilter JavaDoc filter = tuple.getNotificationFilter();
237          if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Filtering notification " + notif + ", filter = " + filter);
238          if (filter != null)
239          {
240             try
241             {
242                boolean deliver = filter.isNotificationEnabled(notif);
243                if (!deliver) return;
244             }
245             catch (Throwable JavaDoc x)
246             {
247                logger.warn("Throwable caught from isNotificationEnabled, filter = " + filter, x);
248                // And go on quietly
249
}
250          }
251       }
252
253       if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Sending Notification " + notif + ", listener info is " + tuple);
254
255       NotificationListener JavaDoc listener = tuple.getNotificationListener();
256
257       try
258       {
259          listener.handleNotification(notif, tuple.getHandback());
260       }
261       catch (Throwable JavaDoc x)
262       {
263          logger.warn("Throwable caught from handleNotification, listener = " + listener, x);
264          // And return quietly
265
}
266    }
267
268    protected Logger getLogger()
269    {
270       return Log.getLogger(getClass().getName());
271    }
272
273    private class NotificationFetcherThread implements Runnable JavaDoc
274    {
275       private long sequenceNumber;
276       private volatile boolean active;
277       private Thread JavaDoc thread;
278       private long timeout;
279       private int maxNumber;
280       private long sleep;
281
282       private NotificationFetcherThread(Map JavaDoc environment)
283       {
284          // Default server timeout is one minute
285
timeout = 60 * 1000;
286          // At most 25 notifications at time
287
maxNumber = 25;
288          // By default we don't sleep and we call the server again.
289
sleep = 0;
290          if (environment != null)
291          {
292             try
293             {
294                timeout = ((Long JavaDoc)environment.get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_TIMEOUT)).longValue();
295             }
296             catch (Exception JavaDoc ignored)
297             {
298             }
299             try
300             {
301                maxNumber = ((Integer JavaDoc)environment.get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_MAX_NUMBER)).intValue();
302             }
303             catch (Exception JavaDoc ignored)
304             {
305             }
306             try
307             {
308                sleep = ((Integer JavaDoc)environment.get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_SLEEP)).intValue();
309             }
310             catch (Exception JavaDoc ignored)
311             {
312             }
313          }
314       }
315
316       private synchronized long getSequenceNumber()
317       {
318          return sequenceNumber;
319       }
320
321       private synchronized void setSequenceNumber(long sequenceNumber)
322       {
323          this.sequenceNumber = sequenceNumber;
324       }
325
326       private boolean isActive()
327       {
328          return active;
329       }
330
331       private synchronized void start()
332       {
333          active = true;
334          // Initialized to a negative value for the first fetchNotification call
335
sequenceNumber = -1;
336          thread = new Thread JavaDoc(this, "Notification Fetcher #" + getFetcherID());
337          thread.setDaemon(true);
338          thread.start();
339       }
340
341       private synchronized void stop()
342       {
343          active = false;
344          thread.interrupt();
345       }
346
347       public void run()
348       {
349          Logger logger = getLogger();
350          try
351          {
352             while (isActive() && !thread.isInterrupted())
353             {
354                try
355                {
356                   long sequence = getSequenceNumber();
357                   NotificationResult JavaDoc result = fetchNotifications(sequence, maxNumber, timeout);
358                   if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Fetched Notifications: " + result);
359
360                   long sleepTime = sleep;
361                   if (result != null)
362                   {
363                      long nextSequence = result.getNextSequenceNumber();
364                      TargetedNotification JavaDoc[] targeted = result.getTargetedNotifications();
365                      int targetedLength = targeted == null ? 0 : targeted.length;
366                      boolean notifsFilteredByServer = sequence >= 0 ? nextSequence - sequence != targetedLength : false;
367                      boolean notifsLostByServer = sequence >= 0 && result.getEarliestSequenceNumber() > sequence;
368                      if (notifsFilteredByServer)
369                      {
370                         // We lost some notification
371
sendConnectionNotificationLost(nextSequence - sequence - targetedLength);
372                      }
373                      if (notifsLostByServer)
374                      {
375                         // We lost some notification
376
sendConnectionNotificationLost(result.getEarliestSequenceNumber() - sequence);
377                      }
378
379                      setSequenceNumber(nextSequence);
380                      int delivered = deliverNotifications(targeted);
381                      if (delivered < targetedLength)
382                      {
383                         // We lost some notification
384
sendConnectionNotificationLost(targetedLength - delivered);
385                      }
386
387                      // If we got a maxNumber of notifications, probably the server has more to send, don't sleep
388
if (targeted != null && targeted.length == maxNumber) sleepTime = 0;
389                   }
390
391                   if (sleepTime > 0) Thread.sleep(sleepTime);
392                }
393                catch (IOException JavaDoc x)
394                {
395                   if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Caught IOException from fetchNotifications", x);
396                   break;
397                }
398                catch (InterruptedException JavaDoc x)
399                {
400                   Thread.currentThread().interrupt();
401                   break;
402                }
403                catch (Throwable JavaDoc x)
404                {
405                   if (logger.isEnabledFor(Logger.WARN)) logger.warn("Caught an unexpected exception", x);
406                }
407             }
408          }
409          finally
410          {
411             AbstractRemoteNotificationClientHandler.this.stop();
412             if (logger.isEnabledFor(Logger.DEBUG)) logger.debug(thread.getName() + " Thread exited");
413          }
414       }
415
416       /**
417        * Fetches notifications from the server side in a separate thread.
418        * Since it involves a remote call, IOExceptions must be handled carefully.
419        * If the connection fails for any reason, the thread will be a sleep and then
420        * retry for a configurable number of times.
421        * If the connection is really lost, the thread will exit.
422        */

423       private NotificationResult JavaDoc fetchNotifications(long sequence, int maxNumber, long timeout) throws IOException JavaDoc, InterruptedException JavaDoc
424       {
425          Logger logger = getLogger();
426          int retries = 0;
427          while (true)
428          {
429             if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Fetching notifications, sequence is " + sequence + ", timeout is " + timeout);
430             try
431             {
432                return AbstractRemoteNotificationClientHandler.this.fetchNotifications(sequence, maxNumber, timeout);
433             }
434             catch (IOException JavaDoc x)
435             {
436                if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Could not fetch notifications, sleeping " + getRetryPeriod() + " and trying " + (getMaxRetries() - retries) + " more times", x);
437                Thread.sleep(getRetryPeriod());
438                if (retries++ == getMaxRetries()) throw x;
439             }
440          }
441       }
442    }
443
444    private class NotificationDelivererThread implements Runnable JavaDoc
445    {
446       private final List JavaDoc notificationQueue = new LinkedList JavaDoc();
447       private int capacity;
448       private volatile boolean active;
449       private Thread JavaDoc thread;
450
451       private NotificationDelivererThread(Map JavaDoc environment)
452       {
453          if (environment != null)
454          {
455             Object JavaDoc size = environment.get(MX4JRemoteConstants.NOTIFICATION_QUEUE_CAPACITY);
456             if (size instanceof Integer JavaDoc)
457             {
458                capacity = ((Integer JavaDoc)size).intValue();
459                if (capacity < 0) capacity = 0;
460             }
461          }
462       }
463
464       private int addNotifications(TargetedNotification JavaDoc[] notifications)
465       {
466          if (notifications == null || notifications.length == 0) return 0;
467
468          List JavaDoc notifs = Arrays.asList(notifications);
469
470          Logger logger = getLogger();
471          if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Enqueuing notifications for delivery: " + notifs);
472
473          synchronized (this)
474          {
475             int size = notifs.size();
476             int added = size;
477             if (capacity > 0)
478             {
479                int room = capacity - notificationQueue.size();
480                if (room < size)
481                {
482                   added = room;
483                   if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Notification queue is full, enqueued " + room + " notifications out of " + size + ", exceeding will be lost");
484                }
485                notificationQueue.addAll(notifs.subList(0, added));
486             }
487             else
488             {
489                notificationQueue.addAll(notifs);
490             }
491             notifyAll();
492             return added;
493          }
494       }
495
496       private boolean isActive()
497       {
498          return active;
499       }
500
501       private synchronized void start()
502       {
503          active = true;
504          notificationQueue.clear();
505          thread = new Thread JavaDoc(this, "Notification Deliverer #" + getDelivererID());
506          thread.setDaemon(true);
507          thread.start();
508       }
509
510       private synchronized void stop()
511       {
512          active = false;
513          thread.interrupt();
514       }
515
516       public void run()
517       {
518          Logger logger = getLogger();
519          try
520          {
521             while (isActive() && !thread.isInterrupted())
522             {
523                try
524                {
525                   TargetedNotification JavaDoc notification = null;
526                   synchronized (this)
527                   {
528                      while (notificationQueue.isEmpty()) wait();
529                      notification = (TargetedNotification JavaDoc)notificationQueue.remove(0);
530                   }
531                   sendNotification(notification);
532                }
533                catch (InterruptedException JavaDoc x)
534                {
535                   Thread.currentThread().interrupt();
536                   break;
537                }
538                catch (Throwable JavaDoc x)
539                {
540                   if (logger.isEnabledFor(Logger.WARN)) logger.warn("Caught an unexpected exception", x);
541                }
542             }
543          }
544          finally
545          {
546             active = false;
547             if (logger.isEnabledFor(Logger.DEBUG)) logger.debug(thread.getName() + " Thread exited");
548          }
549       }
550
551       private int getNotificationsCount()
552       {
553          synchronized (this)
554          {
555             return notificationQueue.size();
556          }
557       }
558    }
559 }
560
Popular Tags