1 9 package org.jboss.mx.remoting; 10 11 import java.util.ArrayList ; 12 import java.util.HashMap ; 13 import java.util.Iterator ; 14 import java.util.List ; 15 import java.util.Map ; 16 import javax.management.InstanceNotFoundException ; 17 import javax.management.ListenerNotFoundException ; 18 import javax.management.MBeanServer ; 19 import javax.management.Notification ; 20 import javax.management.NotificationFilter ; 21 import javax.management.NotificationListener ; 22 import javax.management.ObjectName ; 23 import org.jboss.logging.Logger; 24 import org.jboss.remoting.Client; 25 import org.jboss.remoting.ConnectionFailedException; 26 import org.jboss.remoting.InvokerLocator; 27 import org.jboss.remoting.InvokerRegistry; 28 import org.jboss.remoting.ServerInvoker; 29 import org.jboss.remoting.Subsystem; 30 import org.jboss.remoting.invocation.NameBasedInvocation; 31 import org.jboss.remoting.network.NetworkNotification; 32 import org.jboss.remoting.network.NetworkRegistryFinder; 33 import org.jboss.remoting.transport.ClientInvoker; 34 35 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 36 37 44 public class MBeanNotificationCache implements NotificationListener 45 { 46 private static final Logger log = Logger.getLogger(MBeanNotificationCache.class.getName()); 47 private final MBeanServer server; 48 private final List listeners = new ArrayList (); 49 private final Map queue = new HashMap (); 50 private final ObjectName networkRegistry; 51 private final ServerInvoker serverInvoker; 52 private final String localServerId; 53 54 public MBeanNotificationCache(ServerInvoker invoker, MBeanServer server) 55 throws Exception 56 { 57 this.server = server; 58 this.serverInvoker = invoker; 59 this.localServerId = JMXUtil.getServerId(server); 60 61 networkRegistry = NetworkRegistryFinder.find(server); 62 if(networkRegistry == null) 63 { 64 throw new Exception ("Couldn't find the required NetworkRegistryMBean in this MBeanServer"); 65 } 66 server.addNotificationListener(networkRegistry, this, null, this); 68 } 69 70 public void handleNotification(Notification notification, Object o) 71 { 72 if(notification instanceof NetworkNotification && o != null && this.equals(o)) 73 { 74 String type = notification.getType(); 75 if(type.equals(NetworkNotification.SERVER_REMOVED)) 76 { 77 NetworkNotification nn = (NetworkNotification) notification; 79 String sessionId = nn.getIdentity().getJMXId(); 80 List failed = new ArrayList (); 81 synchronized(listeners) 82 { 83 Iterator iter = listeners.iterator(); 84 while(iter.hasNext()) 85 { 86 Listener listener = (Listener ) iter.next(); 87 if(sessionId.equals(listener.sessionId)) 88 { 89 failed.add(listener); 91 } 92 } 93 } 94 if(failed.isEmpty() == false) 95 { 96 Iterator iter = failed.iterator(); 98 while(iter.hasNext()) 99 { 100 Listener listener = (Listener ) iter.next(); 101 if(log.isTraceEnabled()) 102 { 103 log.trace("++ Removed orphaned listener because server failed: " + nn.getIdentity()); 104 } 105 try 106 { 107 removeNotificationListener(listener.locator, listener.sessionId, listener.objectName, listener.handback); 108 } 109 catch(Exception ig) 110 { 111 } 112 listener = null; 113 } 114 failed = null; 115 } 116 synchronized(queue) 117 { 118 queue.remove(sessionId); 119 } 120 } 121 } 122 } 123 124 public synchronized void destroy() 125 { 126 if(log.isTraceEnabled()) 127 { 128 log.trace("destroy call on notification cache"); 129 } 130 synchronized(listeners) 131 { 132 Iterator iter = listeners.iterator(); 133 while(iter.hasNext()) 134 { 135 Listener l = (Listener ) iter.next(); 136 try 137 { 138 removeNotificationListener(l.locator, l.sessionId, l.objectName, l.handback); 139 } 140 catch(Exception e) 141 { 142 } 143 } 145 } 146 synchronized(queue) 147 { 148 queue.clear(); 149 } 150 try 151 { 152 server.removeNotificationListener(networkRegistry, this); 153 } 154 catch(Exception ig) 155 { 156 } 157 } 158 159 public void addNotificationListener(InvokerLocator clientLocator, String sessionId, ObjectName objectName, NotificationFilter filter, Object handback) 160 throws InstanceNotFoundException 161 { 162 if(log.isTraceEnabled()) 163 { 164 log.trace("remote notification listener added for client [" + clientLocator + "] on objectName [" + objectName + "] and mbeanServerId [" + sessionId + "], filter: " + filter + ", handback: " + handback); 165 } 166 Listener l = new Listener (clientLocator, sessionId, objectName, filter, handback); 167 synchronized(this.listeners) 168 { 169 if(this.listeners.contains(l) == false) 170 { 171 this.listeners.add(l); 172 server.addNotificationListener(objectName, l, filter, handback); 173 } 174 } 175 } 176 177 public void removeNotificationListener(InvokerLocator clientLocator, String sessionId, ObjectName objectName, Object handback) 178 throws InstanceNotFoundException , ListenerNotFoundException 179 { 180 if(log.isTraceEnabled()) 181 { 182 log.trace("removeNotificationListener called with clientLocator: " + clientLocator + ", sessionId: " + sessionId + ", objectName: " + objectName); 183 } 184 synchronized(this.listeners) 185 { 186 Iterator iter = listeners.iterator(); 187 while(iter.hasNext()) 188 { 189 Listener l = (Listener ) iter.next(); 190 if(l.locator.equals(clientLocator) && l.objectName.equals(objectName) && l.sessionId.equals(sessionId)) 191 { 192 if(log.isTraceEnabled()) 193 { 194 log.trace("remote notification listener removed for client [" + clientLocator + "] on objectName [" + objectName + "] and MBeanServerId [" + sessionId + "]"); 195 } 196 iter.remove(); 197 server.removeNotificationListener(objectName, l, l.filter, handback); 198 l.destroy(); 199 l = null; 200 } 201 } 202 } 203 } 204 205 211 public NotificationQueue getNotifications(String sessionId) 212 { 213 synchronized(queue) 214 { 215 return (NotificationQueue) queue.remove(sessionId); 218 } 219 } 220 221 private final class Listener implements NotificationListener 222 { 223 final ObjectName objectName; 224 final Object handback; 225 final NotificationFilter filter; 226 final InvokerLocator locator; 227 final String sessionId; 228 private ClientInvoker clientInvoker; 229 private Client client; 230 private boolean asyncSend = false; 231 private LinkedQueue asyncQueue; 232 private int counter = 0; 233 private BiDirectionClientNotificationSender biDirectionalSender; 234 235 Listener(InvokerLocator locator, String sessionId, ObjectName objectName, NotificationFilter filter, Object handback) 236 { 237 this.objectName = objectName; 238 this.filter = filter; 239 this.locator = locator; 240 this.sessionId = sessionId; 241 this.handback = handback; 242 243 244 if(serverInvoker.isTransportBiDirectional()) 245 { 246 connectAsync(); 248 } 249 } 250 251 synchronized void destroy() 252 { 253 if(log.isTraceEnabled()) 254 { 255 log.trace("destroy called on client [" + locator + "], session id [" + sessionId + "]"); 256 } 257 try 258 { 259 removeNotificationListener(locator, sessionId, objectName, handback); 260 } 261 catch(Throwable e) 262 { 263 } 264 if(biDirectionalSender != null) 265 { 266 biDirectionalSender.running = false; 267 biDirectionalSender.interrupt(); 268 biDirectionalSender = null; 269 while(asyncQueue != null && asyncQueue.isEmpty() == false) 270 { 271 try 272 { 273 asyncQueue.take(); 274 } 275 catch(InterruptedException ex) 276 { 277 break; 278 } 279 } 280 asyncQueue = null; 281 } 282 if(client != null) 283 { 284 try 285 { 286 client.disconnect(); 287 } 288 finally 289 { 290 client = null; 291 } 292 } 293 } 294 295 private void connectAsync() 296 { 297 try 298 { 299 if(log.isTraceEnabled()) 300 { 301 log.trace("attempting an bi-directional connection back to client [" + locator + "], server id [" + sessionId + "]"); 302 } 303 clientInvoker = InvokerRegistry.createClientInvoker(locator); 305 clientInvoker.connect(); 306 client = new Client(Thread.currentThread().getContextClassLoader(), clientInvoker, Subsystem.JMX); 307 asyncQueue = new LinkedQueue(); 308 biDirectionalSender = new BiDirectionClientNotificationSender(); 309 biDirectionalSender.start(); 310 asyncSend = true; 311 } 312 catch(Throwable e) 313 { 314 log.debug("attempted a bi-directional connection back to client [" + locator + "], but it failed", e); 315 } 316 } 317 318 private final class BiDirectionClientNotificationSender extends Thread 319 { 320 private boolean running = true; 321 322 public void run() 323 { 324 NotificationQueue nq = new NotificationQueue(sessionId); 325 int count = 0; 326 long lastTx = 0; 327 while(running) 328 { 329 try 330 { 331 while(count < 10 && !asyncQueue.isEmpty()) 332 { 333 NotificationEntry ne = (NotificationEntry) asyncQueue.take(); 335 nq.add(ne); 336 count++; 337 counter++; 338 } 339 if((count > 10 || asyncQueue.isEmpty() || System.currentTimeMillis() - lastTx >= 2000) && nq.isEmpty() == false) 342 { 343 try 345 { 346 if(log.isTraceEnabled()) 347 { 348 log.trace("sending notification queue [" + nq + "] to client [" + locator + "] with sessionId [" + sessionId + "], counter=" + counter + " ,count=" + count); 349 } 350 lastTx = System.currentTimeMillis(); 351 client.setSessionId(localServerId); 352 client.invoke(new NameBasedInvocation("$NOTIFICATIONS$", 353 new Object []{nq}, 354 new String []{NotificationQueue.class.getName()}), 355 null); 356 } 357 catch(Throwable t) 358 { 359 if(t instanceof ConnectionFailedException) 360 { 361 if(log.isTraceEnabled()) 362 { 363 log.trace("Client is dead during invocation"); 364 } 365 Listener.this.destroy(); 366 break; 367 } 368 else 369 { 370 log.warn("Error sending async notifications to client: " + locator, t); 371 } 372 } 373 finally 374 { 375 nq.clear(); 377 count = 0; 378 } 379 } 380 else if(asyncQueue.isEmpty()) 381 { 382 if(log.isTraceEnabled()) 384 { 385 log.trace("blocking on more notifications to arrive"); 386 } 387 NotificationEntry ne = (NotificationEntry) asyncQueue.take(); 388 nq.add(ne); 389 count += 1; 390 counter++; 391 } 392 } 393 catch(InterruptedException ex) 394 { 395 break; 396 } 397 } 398 } 399 } 400 401 public void handleNotification(Notification notification, Object o) 402 { 403 if(log.isTraceEnabled()) 404 { 405 log.trace("(" + (asyncSend ? "async" : "polling") + ") notification received ..." + notification + " for client [" + locator + "]"); 406 } 407 if(asyncSend == false) 408 { 409 NotificationQueue q = null; 411 412 synchronized(queue) 413 { 414 q = (NotificationQueue) queue.get(sessionId); 416 if(q == null) 417 { 418 q = new NotificationQueue(sessionId); 420 queue.put(sessionId, q); 421 } 422 if(log.isTraceEnabled()) 423 { 424 log.trace("added notification to polling queue: " + notification + " for sessionId: " + sessionId); 425 } 426 q.add(new NotificationEntry(notification, handback)); 427 } 428 } 429 else 430 { 431 if(asyncQueue != null) 432 { 433 try 435 { 436 asyncQueue.put(new NotificationEntry(notification, handback)); 437 } 438 catch(InterruptedException ie) 439 { 440 441 } 442 } 443 } 444 } 445 } 446 } 447 | Popular Tags |