KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > mx > remoting > MBeanNotificationCache


1 /***************************************
2  * *
3  * JBoss: The OpenSource J2EE WebOS *
4  * *
5  * Distributable under LGPL license. *
6  * See terms of license at gnu.org. *
7  * *
8  ***************************************/

9 package org.jboss.mx.remoting;
10
11 import java.util.ArrayList JavaDoc;
12 import java.util.HashMap JavaDoc;
13 import java.util.Iterator JavaDoc;
14 import java.util.List JavaDoc;
15 import java.util.Map JavaDoc;
16 import javax.management.InstanceNotFoundException JavaDoc;
17 import javax.management.ListenerNotFoundException JavaDoc;
18 import javax.management.MBeanServer JavaDoc;
19 import javax.management.Notification JavaDoc;
20 import javax.management.NotificationFilter JavaDoc;
21 import javax.management.NotificationListener JavaDoc;
22 import javax.management.ObjectName JavaDoc;
23 import org.jboss.logging.Logger;
24 import org.jboss.remoting.Client;
25 import org.jboss.remoting.ConnectionFailedException;
26 import org.jboss.remoting.InvokerLocator;
27 import org.jboss.remoting.InvokerRegistry;
28 import org.jboss.remoting.ServerInvoker;
29 import org.jboss.remoting.Subsystem;
30 import org.jboss.remoting.invocation.NameBasedInvocation;
31 import org.jboss.remoting.network.NetworkNotification;
32 import org.jboss.remoting.network.NetworkRegistryFinder;
33 import org.jboss.remoting.transport.ClientInvoker;
34
35 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
36
37 /**
38  * MBeanNotificationCache is an object that queues all the server side JMX notifications on behalf
39  * of a client invoker.
40  *
41  * @author <a HREF="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
42  * @version $Revision: 30251 $
43  */

44 public class MBeanNotificationCache implements NotificationListener JavaDoc
45 {
46    private static final Logger log = Logger.getLogger(MBeanNotificationCache.class.getName());
47    private final MBeanServer JavaDoc server;
48    private final List JavaDoc listeners = new ArrayList JavaDoc();
49    private final Map JavaDoc queue = new HashMap JavaDoc();
50    private final ObjectName JavaDoc networkRegistry;
51    private final ServerInvoker serverInvoker;
52    private final String JavaDoc localServerId;
53
54    public MBeanNotificationCache(ServerInvoker invoker, MBeanServer JavaDoc server)
55          throws Exception JavaDoc
56    {
57       this.server = server;
58       this.serverInvoker = invoker;
59       this.localServerId = JMXUtil.getServerId(server);
60
61       networkRegistry = NetworkRegistryFinder.find(server);
62       if(networkRegistry == null)
63       {
64          throw new Exception JavaDoc("Couldn't find the required NetworkRegistryMBean in this MBeanServer");
65       }
66       // add ourself as a listener for detection failed events
67
server.addNotificationListener(networkRegistry, this, null, this);
68    }
69
70    public void handleNotification(Notification JavaDoc notification, Object JavaDoc o)
71    {
72       if(notification instanceof NetworkNotification && o != null && this.equals(o))
73       {
74          String JavaDoc type = notification.getType();
75          if(type.equals(NetworkNotification.SERVER_REMOVED))
76          {
77             // server has failed
78
NetworkNotification nn = (NetworkNotification) notification;
79             String JavaDoc sessionId = nn.getIdentity().getJMXId();
80             List JavaDoc failed = new ArrayList JavaDoc();
81             synchronized(listeners)
82             {
83                Iterator JavaDoc iter = listeners.iterator();
84                while(iter.hasNext())
85                {
86                   Listener JavaDoc listener = (Listener JavaDoc) iter.next();
87                   if(sessionId.equals(listener.sessionId))
88                   {
89                      // just put into a list, so we only sync min time
90
failed.add(listener);
91                   }
92                }
93             }
94             if(failed.isEmpty() == false)
95             {
96                // walk through and remove each listener that has failed
97
Iterator JavaDoc iter = failed.iterator();
98                while(iter.hasNext())
99                {
100                   Listener JavaDoc listener = (Listener JavaDoc) iter.next();
101                   if(log.isTraceEnabled())
102                   {
103                      log.trace("++ Removed orphaned listener because server failed: " + nn.getIdentity());
104                   }
105                   try
106                   {
107                      removeNotificationListener(listener.locator, listener.sessionId, listener.objectName, listener.handback);
108                   }
109                   catch(Exception JavaDoc ig)
110                   {
111                   }
112                   listener = null;
113                }
114                failed = null;
115             }
116             synchronized(queue)
117             {
118                queue.remove(sessionId);
119             }
120          }
121       }
122    }
123
124    public synchronized void destroy()
125    {
126       if(log.isTraceEnabled())
127       {
128          log.trace("destroy call on notification cache");
129       }
130       synchronized(listeners)
131       {
132          Iterator JavaDoc iter = listeners.iterator();
133          while(iter.hasNext())
134          {
135             Listener JavaDoc l = (Listener JavaDoc) iter.next();
136             try
137             {
138                removeNotificationListener(l.locator, l.sessionId, l.objectName, l.handback);
139             }
140             catch(Exception JavaDoc e)
141             {
142             }
143             // remove will remove from the listeners list
144
}
145       }
146       synchronized(queue)
147       {
148          queue.clear();
149       }
150       try
151       {
152          server.removeNotificationListener(networkRegistry, this);
153       }
154       catch(Exception JavaDoc ig)
155       {
156       }
157    }
158
159    public void addNotificationListener(InvokerLocator clientLocator, String JavaDoc sessionId, ObjectName JavaDoc objectName, NotificationFilter JavaDoc filter, Object JavaDoc handback)
160          throws InstanceNotFoundException JavaDoc
161    {
162       if(log.isTraceEnabled())
163       {
164          log.trace("remote notification listener added for client [" + clientLocator + "] on objectName [" + objectName + "] and mbeanServerId [" + sessionId + "], filter: " + filter + ", handback: " + handback);
165       }
166       Listener JavaDoc l = new Listener JavaDoc(clientLocator, sessionId, objectName, filter, handback);
167       synchronized(this.listeners)
168       {
169          if(this.listeners.contains(l) == false)
170          {
171             this.listeners.add(l);
172             server.addNotificationListener(objectName, l, filter, handback);
173          }
174       }
175    }
176
177    public void removeNotificationListener(InvokerLocator clientLocator, String JavaDoc sessionId, ObjectName JavaDoc objectName, Object JavaDoc handback)
178          throws InstanceNotFoundException JavaDoc, ListenerNotFoundException JavaDoc
179    {
180       if(log.isTraceEnabled())
181       {
182          log.trace("removeNotificationListener called with clientLocator: " + clientLocator + ", sessionId: " + sessionId + ", objectName: " + objectName);
183       }
184       synchronized(this.listeners)
185       {
186          Iterator JavaDoc iter = listeners.iterator();
187          while(iter.hasNext())
188          {
189             Listener JavaDoc l = (Listener JavaDoc) iter.next();
190             if(l.locator.equals(clientLocator) && l.objectName.equals(objectName) && l.sessionId.equals(sessionId))
191             {
192                if(log.isTraceEnabled())
193                {
194                   log.trace("remote notification listener removed for client [" + clientLocator + "] on objectName [" + objectName + "] and MBeanServerId [" + sessionId + "]");
195                }
196                iter.remove();
197                server.removeNotificationListener(objectName, l, l.filter, handback);
198                l.destroy();
199                l = null;
200             }
201          }
202       }
203    }
204
205    /**
206     * pull notifications for a given sessionId and return the queue or null if none pending
207     *
208     * @param sessionId
209     * @return
210     */

211    public NotificationQueue getNotifications(String JavaDoc sessionId)
212    {
213       synchronized(queue)
214       {
215          // remove the queue object each time, if it exists, the
216
// listener will re-create a new one on each notification
217
return (NotificationQueue) queue.remove(sessionId);
218       }
219    }
220
221    private final class Listener implements NotificationListener JavaDoc
222    {
223       final ObjectName JavaDoc objectName;
224       final Object JavaDoc handback;
225       final NotificationFilter JavaDoc filter;
226       final InvokerLocator locator;
227       final String JavaDoc sessionId;
228       private ClientInvoker clientInvoker;
229       private Client client;
230       private boolean asyncSend = false;
231       private LinkedQueue asyncQueue;
232       private int counter = 0;
233       private BiDirectionClientNotificationSender biDirectionalSender;
234
235       Listener(InvokerLocator locator, String JavaDoc sessionId, ObjectName JavaDoc objectName, NotificationFilter JavaDoc filter, Object JavaDoc handback)
236       {
237          this.objectName = objectName;
238          this.filter = filter;
239          this.locator = locator;
240          this.sessionId = sessionId;
241          this.handback = handback;
242
243
244          if(serverInvoker.isTransportBiDirectional())
245          {
246             // attempt connection
247
connectAsync();
248          }
249       }
250
251       synchronized void destroy()
252       {
253          if(log.isTraceEnabled())
254          {
255             log.trace("destroy called on client [" + locator + "], session id [" + sessionId + "]");
256          }
257          try
258          {
259             removeNotificationListener(locator, sessionId, objectName, handback);
260          }
261          catch(Throwable JavaDoc e)
262          {
263          }
264          if(biDirectionalSender != null)
265          {
266             biDirectionalSender.running = false;
267             biDirectionalSender.interrupt();
268             biDirectionalSender = null;
269             while(asyncQueue != null && asyncQueue.isEmpty() == false)
270             {
271                try
272                {
273                   asyncQueue.take();
274                }
275                catch(InterruptedException JavaDoc ex)
276                {
277                   break;
278                }
279             }
280             asyncQueue = null;
281          }
282          if(client != null)
283          {
284             try
285             {
286                client.disconnect();
287             }
288             finally
289             {
290                client = null;
291             }
292          }
293       }
294
295       private void connectAsync()
296       {
297          try
298          {
299             if(log.isTraceEnabled())
300             {
301                log.trace("attempting an bi-directional connection back to client [" + locator + "], server id [" + sessionId + "]");
302             }
303             // attempt connection back
304
clientInvoker = InvokerRegistry.createClientInvoker(locator);
305             clientInvoker.connect();
306             client = new Client(Thread.currentThread().getContextClassLoader(), clientInvoker, Subsystem.JMX);
307             asyncQueue = new LinkedQueue();
308             biDirectionalSender = new BiDirectionClientNotificationSender();
309             biDirectionalSender.start();
310             asyncSend = true;
311          }
312          catch(Throwable JavaDoc e)
313          {
314             log.debug("attempted a bi-directional connection back to client [" + locator + "], but it failed", e);
315          }
316       }
317
318       private final class BiDirectionClientNotificationSender extends Thread JavaDoc
319       {
320          private boolean running = true;
321
322          public void run()
323          {
324             NotificationQueue nq = new NotificationQueue(sessionId);
325             int count = 0;
326             long lastTx = 0;
327             while(running)
328             {
329                try
330                {
331                   while(count < 10 && !asyncQueue.isEmpty())
332                   {
333                      // as long as we have entries w/o blocking, add them
334
NotificationEntry ne = (NotificationEntry) asyncQueue.take();
335                      nq.add(ne);
336                      count++;
337                      counter++;
338                   }
339                   // take up to 10 notifications before forcing a send ,or if we block for
340
// more than 2 secs
341
if((count > 10 || asyncQueue.isEmpty() || System.currentTimeMillis() - lastTx >= 2000) && nq.isEmpty() == false)
342                   {
343                      // send back to client
344
try
345                      {
346                         if(log.isTraceEnabled())
347                         {
348                            log.trace("sending notification queue [" + nq + "] to client [" + locator + "] with sessionId [" + sessionId + "], counter=" + counter + " ,count=" + count);
349                         }
350                         lastTx = System.currentTimeMillis();
351                         client.setSessionId(localServerId);
352                         client.invoke(new NameBasedInvocation("$NOTIFICATIONS$",
353                                                               new Object JavaDoc[]{nq},
354                                                               new String JavaDoc[]{NotificationQueue.class.getName()}),
355                                       null);
356                      }
357                      catch(Throwable JavaDoc t)
358                      {
359                         if(t instanceof ConnectionFailedException)
360                         {
361                            if(log.isTraceEnabled())
362                            {
363                               log.trace("Client is dead during invocation");
364                            }
365                            Listener.this.destroy();
366                            break;
367                         }
368                         else
369                         {
370                            log.warn("Error sending async notifications to client: " + locator, t);
371                         }
372                      }
373                      finally
374                      {
375                         // clear the items in the queue, if any
376
nq.clear();
377                         count = 0;
378                      }
379                   }
380                   else if(asyncQueue.isEmpty())
381                   {
382                      // this will block
383
if(log.isTraceEnabled())
384                      {
385                         log.trace("blocking on more notifications to arrive");
386                      }
387                      NotificationEntry ne = (NotificationEntry) asyncQueue.take();
388                      nq.add(ne);
389                      count += 1;
390                      counter++;
391                   }
392                }
393                catch(InterruptedException JavaDoc ex)
394                {
395                   break;
396                }
397             }
398          }
399       }
400
401       public void handleNotification(Notification JavaDoc notification, Object JavaDoc o)
402       {
403          if(log.isTraceEnabled())
404          {
405             log.trace("(" + (asyncSend ? "async" : "polling") + ") notification received ..." + notification + " for client [" + locator + "]");
406          }
407          if(asyncSend == false)
408          {
409             // not async, we are going to queue for polling ...
410
NotificationQueue q = null;
411
412             synchronized(queue)
413             {
414                // get the queue
415
q = (NotificationQueue) queue.get(sessionId);
416                if(q == null)
417                {
418                   // doesn't exist, create it
419
q = new NotificationQueue(sessionId);
420                   queue.put(sessionId, q);
421                }
422                if(log.isTraceEnabled())
423                {
424                   log.trace("added notification to polling queue: " + notification + " for sessionId: " + sessionId);
425                }
426                q.add(new NotificationEntry(notification, handback));
427             }
428          }
429          else
430          {
431             if(asyncQueue != null)
432             {
433                // this is a bi-directional client, send it immediately
434
try
435                {
436                   asyncQueue.put(new NotificationEntry(notification, handback));
437                }
438                catch(InterruptedException JavaDoc ie)
439                {
440
441                }
442             }
443          }
444       }
445    }
446 }
447
Popular Tags