KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > mx4j > remote > DefaultRemoteNotificationServerHandler


1 /*
2  * Copyright (C) The MX4J Contributors.
3  * All rights reserved.
4  *
5  * This software is distributed under the terms of the MX4J License version 1.0.
6  * See the terms of the MX4J License in the documentation provided with this software.
7  */

8
9 package mx4j.remote;
10
11 import java.io.IOException JavaDoc;
12 import java.util.Arrays JavaDoc;
13 import java.util.HashMap JavaDoc;
14 import java.util.LinkedList JavaDoc;
15 import java.util.List JavaDoc;
16 import java.util.Map JavaDoc;
17
18 import javax.management.Notification JavaDoc;
19 import javax.management.NotificationFilter JavaDoc;
20 import javax.management.NotificationListener JavaDoc;
21 import javax.management.ObjectName JavaDoc;
22 import javax.management.remote.NotificationResult JavaDoc;
23 import javax.management.remote.TargetedNotification JavaDoc;
24
25 import mx4j.log.Log;
26 import mx4j.log.Logger;
27
28 /**
29  * Base implementation of the RemoteNotificationServerHandler interface.
30  *
31  * @version $Revision: 1.12 $
32  */

33 public class DefaultRemoteNotificationServerHandler implements RemoteNotificationServerHandler
34 {
35    private static int listenerID;
36
37    private final NotificationListener JavaDoc listener;
38    private final Map JavaDoc tuples = new HashMap JavaDoc();
39    private final NotificationBuffer buffer;
40    private volatile boolean closed;
41
42    /**
43     * Creates a new remote notification server handler.
44     *
45     * @param environment Contains environment variables used to configure this handler
46     * @see MX4JRemoteConstants#NOTIFICATION_BUFFER_CAPACITY
47     * @see MX4JRemoteConstants#NOTIFICATION_PURGE_DISTANCE
48     */

49    public DefaultRemoteNotificationServerHandler(Map JavaDoc environment)
50    {
51       listener = new ServerListener();
52       buffer = new NotificationBuffer(environment);
53    }
54
55    public Integer JavaDoc generateListenerID(ObjectName JavaDoc name, NotificationFilter JavaDoc filter)
56    {
57       synchronized (DefaultRemoteNotificationServerHandler.class)
58       {
59          return new Integer JavaDoc(++listenerID);
60       }
61    }
62
63    public NotificationListener JavaDoc getServerNotificationListener()
64    {
65       return listener;
66    }
67
68    public void addNotificationListener(Integer JavaDoc id, NotificationTuple tuple)
69    {
70       if (closed) return;
71       synchronized (tuples)
72       {
73          tuples.put(id, tuple);
74       }
75    }
76
77    public NotificationTuple removeNotificationListener(Integer JavaDoc id)
78    {
79       if (closed) return null;
80       synchronized (tuples)
81       {
82          return (NotificationTuple)tuples.remove(id);
83       }
84    }
85
86    public NotificationResult JavaDoc fetchNotifications(long sequenceNumber, int maxNotifications, long timeout) throws IOException JavaDoc
87    {
88       if (closed) throw new IOException JavaDoc("RemoteNotificationServerHandler is closed");
89       return buffer.getNotifications(sequenceNumber, maxNotifications, timeout);
90    }
91
92    public NotificationTuple[] close()
93    {
94       Logger logger = getLogger();
95       closed = true;
96       stopWaitingForNotifications(buffer);
97       synchronized (tuples)
98       {
99          NotificationTuple[] result = (NotificationTuple[])tuples.values().toArray(new NotificationTuple[tuples.size()]);
100          tuples.clear();
101          if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("RemoteNotificationServerHandler closed, returning: " + Arrays.asList(result));
102          return result;
103       }
104    }
105
106    /**
107     * When a connection is closed, it may be possible that a client RMI call is waiting in
108     * {@link #waitForNotifications}, so here we wake it up, letting the thread return to the
109     * client and free resources on client's side.
110     *
111     * @param lock The object on which {@link #notifyAll} should be called
112     */

113    private void stopWaitingForNotifications(Object JavaDoc lock)
114    {
115       synchronized (lock)
116       {
117          lock.notifyAll();
118       }
119    }
120
121    /**
122     * Called when there are no notifications to send to the client.
123     * It is guaranteed that no notification can be added before this method waits on the given lock.
124     * It should wait on the given lock for the specified timeout, and return true
125     * to send notifications (if no notifications arrived, an empty notification array
126     * will be returned to the client), or false if no notifications should be sent to
127     * the client.
128     *
129     * @param lock The object on which {@link #wait} should be called
130     * @param timeout The amount of time to wait (guaranteed to be strictly greater than 0)
131     */

132    protected boolean waitForNotifications(Object JavaDoc lock, long timeout)
133    {
134       Logger logger = getLogger();
135       long start = 0;
136       if (logger.isEnabledFor(Logger.DEBUG))
137       {
138          logger.debug("Waiting for notifications " + timeout + " ms");
139          start = System.currentTimeMillis();
140       }
141
142       synchronized (lock)
143       {
144          try
145          {
146             lock.wait(timeout);
147          }
148          catch (InterruptedException JavaDoc x)
149          {
150             Thread.currentThread().interrupt();
151          }
152       }
153
154       if (logger.isEnabledFor(Logger.DEBUG))
155       {
156          long elapsed = System.currentTimeMillis() - start;
157          logger.debug("Waited for notifications " + elapsed + " ms");
158       }
159
160       return true;
161    }
162
163    /**
164     * This method filters the given notification array and returns a possibly smaller array containing
165     * only notifications that passed successfully the filtering.
166     * Default behavior is no filtering, but subclasses may choose to change this bahavior.
167     * For example, for RMI, one can assure that all notifications are truly serializable, and log those
168     * that are not.
169     */

170    protected TargetedNotification JavaDoc[] filterNotifications(TargetedNotification JavaDoc[] notifications)
171    {
172       return notifications;
173    }
174
175    private void addNotification(Integer JavaDoc id, Notification JavaDoc notification)
176    {
177       buffer.add(new TargetedNotification JavaDoc(notification, id));
178    }
179
180    protected Logger getLogger()
181    {
182       return Log.getLogger(getClass().getName());
183    }
184
185    private class ServerListener implements NotificationListener JavaDoc
186    {
187       public void handleNotification(Notification JavaDoc notification, Object JavaDoc handback)
188       {
189          Integer JavaDoc id = (Integer JavaDoc)handback;
190          addNotification(id, notification);
191       }
192    }
193
194    private class NotificationBuffer
195    {
196       private final List JavaDoc notifications = new LinkedList JavaDoc();
197       private int maxCapacity;
198       private int purgeDistance;
199       private long firstSequence;
200       private long lastSequence;
201       private long lowestExpectedSequence = -1;
202
203       private NotificationBuffer(Map JavaDoc environment)
204       {
205          if (environment != null)
206          {
207             try
208             {
209                Integer JavaDoc maxCapacityInteger = (Integer JavaDoc)environment.get(MX4JRemoteConstants.NOTIFICATION_BUFFER_CAPACITY);
210                if (maxCapacityInteger != null) maxCapacity = maxCapacityInteger.intValue();
211             }
212             catch (Exception JavaDoc ignored)
213             {
214             }
215
216             try
217             {
218                Integer JavaDoc purgeDistanceInteger = (Integer JavaDoc)environment.get(MX4JRemoteConstants.NOTIFICATION_PURGE_DISTANCE);
219                if (purgeDistanceInteger != null) purgeDistance = purgeDistanceInteger.intValue();
220             }
221             catch (Exception JavaDoc ignored)
222             {
223             }
224          }
225          if (maxCapacity <= 0) maxCapacity = 1024;
226          if (purgeDistance <= 0) purgeDistance = 128;
227       }
228
229       private int getSize()
230       {
231          synchronized (this)
232          {
233             return notifications.size();
234          }
235       }
236
237       private void add(TargetedNotification JavaDoc notification)
238       {
239          Logger logger = getLogger();
240          synchronized (this)
241          {
242             if (notifications.size() == maxCapacity)
243             {
244                if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Notification buffer full: " + this);
245                removeRange(0, 1);
246             }
247             notifications.add(notification);
248             ++lastSequence;
249             if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Notification added to buffer: " + this);
250             notifyAll();
251          }
252       }
253
254       private void removeRange(int start, int end)
255       {
256          synchronized (this)
257          {
258             notifications.subList(start, end).clear();
259             firstSequence += end - start;
260          }
261       }
262
263       private long getFirstSequenceNumber()
264       {
265          synchronized (this)
266          {
267             return firstSequence;
268          }
269       }
270
271       private long getLastSequenceNumber()
272       {
273          synchronized (this)
274          {
275             return lastSequence;
276          }
277       }
278
279       private NotificationResult JavaDoc getNotifications(long sequenceNumber, int maxNotifications, long timeout)
280       {
281          Logger logger = getLogger();
282          synchronized (this)
283          {
284             NotificationResult JavaDoc result = null;
285             int size = 0;
286             if (sequenceNumber < 0)
287             {
288                // We loose the notifications between addNotificationListener() and fetchNotifications(), but c'est la vie.
289
long sequence = getLastSequenceNumber();
290                size = new Long JavaDoc(sequence + 1).intValue();
291                result = new NotificationResult JavaDoc(getFirstSequenceNumber(), sequence, new TargetedNotification JavaDoc[0]);
292                if (lowestExpectedSequence < 0) lowestExpectedSequence = sequence;
293                if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("First fetchNotification call: " + this + ", returning " + result);
294             }
295             else
296             {
297                long firstSequence = getFirstSequenceNumber();
298
299                int losts = 0;
300                int start = new Long JavaDoc(sequenceNumber - firstSequence).intValue();
301                // In the time between 2 fetches the buffer may have overflew, so that start < 0.
302
// It simply mean that we send the first notification we have (start = 0),
303
// and the client will emit a notification lost event.
304
if (start < 0)
305                {
306                   losts = -start;
307                   start = 0;
308                }
309
310                List JavaDoc sublist = null;
311                boolean send = false;
312                while (size == 0)
313                {
314                   int end = notifications.size();
315                   if (end - start > maxNotifications) end = start + maxNotifications;
316
317                   sublist = notifications.subList(start, end);
318                   size = sublist.size();
319
320                   if (closed || send) break;
321
322                   if (size == 0)
323                   {
324                      if (timeout <= 0) break;
325                      if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("No notifications to send, waiting " + timeout + " ms");
326
327                      // We wait for notifications to arrive. Since we release the lock on the buffer
328
// other threads can modify it. To avoid ConcurrentModificationException we compute
329
// again the sublist by coming up back to the while statement
330
send = waitForNotifications(this, timeout);
331                   }
332                }
333
334                TargetedNotification JavaDoc[] notifications = (TargetedNotification JavaDoc[])sublist.toArray(new TargetedNotification JavaDoc[size]);
335                notifications = filterNotifications(notifications);
336                result = new NotificationResult JavaDoc(firstSequence, sequenceNumber + losts + size, notifications);
337                if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Non-first fetchNotification call: " + this + ", returning " + result);
338
339                int purged = purgeNotifications(sequenceNumber, size);
340                if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Purged " + purged + " notifications: " + this);
341             }
342             return result;
343          }
344       }
345
346       private int purgeNotifications(long sequenceNumber, int size)
347       {
348          // Record the lowest expected sequence number sent by the client.
349
// New clients will always have an initial big sequence number
350
// (they're initialized with getLastSequenceNumber()), while old
351
// clients can have lesser sequence numbers.
352
// Here we record the lesser of these sequence numbers, that is the
353
// sequence number of the oldest notification any client may ever ask.
354
// This way we can purge old notifications that have already been
355
// delivered to clients.
356

357          // The worst case is when a client has a long interval between fetchNotifications()
358
// calls, and another client has a short interval. The lowestExpectedSequence will
359
// grow with the second client, until a purge happens, so the first client can
360
// loose notifications. By tuning appropriately the purgeDistance and the interval
361
// between fetchNotifications() calls, it should never happen.
362

363          int result = 0;
364          synchronized (this)
365          {
366             if (sequenceNumber <= lowestExpectedSequence)
367             {
368                long lowest = Math.min(lowestExpectedSequence, sequenceNumber);
369
370                long firstSequence = getFirstSequenceNumber();
371                if (lowest - firstSequence > purgeDistance)
372                {
373                   // Purge only half of the old notifications, for safety
374
int purgeSize = purgeDistance >> 1;
375                   removeRange(0, purgeSize);
376                   result = purgeSize;
377                }
378
379                long expected = Math.max(sequenceNumber + size, firstSequence);
380                lowestExpectedSequence = expected;
381             }
382          }
383          return result;
384       }
385
386       public String JavaDoc toString()
387       {
388          StringBuffer JavaDoc buffer = new StringBuffer JavaDoc("NotificationBuffer@");
389          buffer.append(Integer.toHexString(hashCode())).append("[");
390          buffer.append("first=").append(getFirstSequenceNumber()).append(", ");
391          buffer.append("last=").append(getLastSequenceNumber()).append(", ");
392          buffer.append("size=").append(getSize()).append(", ");
393          buffer.append("lowestExpected=").append(lowestExpectedSequence).append(", ");
394          buffer.append("maxCapacity=").append(maxCapacity).append(", ");
395          buffer.append("purgeDistance=").append(purgeDistance).append("]");
396          return buffer.toString();
397       }
398    }
399 }
400
Popular Tags