1 9 package org.jboss.mx.remoting; 10 11 import java.lang.reflect.InvocationHandler ; 12 import java.lang.reflect.Method ; 13 import java.lang.reflect.Proxy ; 14 import java.lang.reflect.UndeclaredThrowableException ; 15 import java.util.HashMap ; 16 import java.util.HashSet ; 17 import java.util.Iterator ; 18 import java.util.Map ; 19 import java.util.Set ; 20 import java.util.Timer ; 21 import java.util.TimerTask ; 22 import javax.management.MBeanException ; 23 import javax.management.MBeanServer ; 24 import javax.management.NotificationFilter ; 25 import javax.management.NotificationListener ; 26 import javax.management.ObjectName ; 27 import javax.management.ReflectionException ; 28 import org.jboss.logging.Logger; 29 import org.jboss.remoting.Client; 30 import org.jboss.remoting.ConnectionFailedException; 31 import org.jboss.remoting.InvokerLocator; 32 import org.jboss.remoting.Subsystem; 33 import org.jboss.remoting.invocation.NameBasedInvocation; 34 35 42 public class MBeanServerClientInvokerProxy implements InvocationHandler 43 { 44 private static final Logger log = Logger.getLogger(MBeanServerClientInvokerProxy.class.getName()); 45 46 private final String serverId; 47 private final String localJmxId; 48 private final transient InvokerLocator locator; 49 private final transient Client client; 50 private final transient Map paramMap = new HashMap (); 51 private NotificationPoller poller = new NotificationPoller(); 52 private Timer pollTimer = new Timer (true); 53 private static transient Map proxies = new HashMap (); 54 private transient Set listeners = new HashSet (); 55 56 private MBeanServerClientInvokerProxy(ClassLoader cl, InvokerLocator invoker, String localJmxId, String id) 57 throws Exception 58 { 59 this.localJmxId = localJmxId; 60 this.serverId = id; 61 if(this.serverId == null) 62 { 63 throw new IllegalArgumentException ("MBeanServer ID not found - make sure the NetworkRegistry MBean is running"); 64 } 65 this.client = new Client(cl, invoker, Subsystem.JMX, null); 66 this.locator = new InvokerLocator(invoker.getLocatorURI()); 67 this.proxies.put(id, this); 68 setupPollingTimer(); 69 } 70 71 77 public static synchronized MBeanServerClientInvokerProxy remove(String id) 78 { 79 return (MBeanServerClientInvokerProxy) proxies.remove(id); 80 } 81 82 88 public static synchronized MBeanServerClientInvokerProxy get(String id) 89 { 90 return (MBeanServerClientInvokerProxy) proxies.get(id); 91 } 92 93 106 protected void setupPollingTimer() 107 { 108 Map map = this.locator.getParameters(); 109 long delay = 1000; if(map != null) 111 { 112 String ds = (String ) map.get("pollinterval"); 113 if(ds != null) 114 { 115 delay = Integer.parseInt(ds); 116 } 117 } 118 if(delay > 0) 119 { 120 this.pollTimer.scheduleAtFixedRate(poller, 2000, delay); 121 } 122 } 123 124 129 public synchronized String getServerId() 130 { 131 return this.serverId; 132 } 133 134 139 public InvokerLocator getLocator() 140 { 141 return locator; 142 } 143 144 public static synchronized MBeanServer create(InvokerLocator locator, String localJmxId, String jmxId) throws Exception 145 { 146 MBeanServer mbeanserver = MBeanServerRegistry.getMBeanServerFor(locator); 148 if(mbeanserver != null) 149 { 150 return mbeanserver; 151 } 152 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 153 if(cl == null) 154 { 155 cl = MBeanServerClientInvokerProxy.class.getClassLoader(); 156 } 157 MBeanServerClientInvokerProxy handler = new MBeanServerClientInvokerProxy(cl, locator, localJmxId, jmxId); 158 mbeanserver = (MBeanServer ) Proxy.newProxyInstance(MBeanServerClientInvokerProxy.class.getClassLoader(), new Class []{MBeanServer .class}, handler); 159 MBeanServerRegistry.register(mbeanserver, handler); 160 log.debug("created MBeanServer proxy to remote mbeanserver at: " + locator + " for JMX Id: " + jmxId + " from our JMX id: " + localJmxId); 161 return mbeanserver; 162 } 163 164 167 public synchronized void destroy() 168 { 169 if(log.isTraceEnabled()) 170 { 171 log.trace("destroy called on: " + this + " for locator: " + locator + ", and serverid: " + serverId); 172 } 173 client.disconnect(); 174 MBeanServerRegistry.unregister(this); 175 cancelPoller(); 176 removeListeners(); 177 paramMap.clear(); 178 proxies.remove(serverId); 179 } 180 181 184 private synchronized void cancelPoller() 185 { 186 if(poller != null) 187 { 188 poller.cancel(); 189 poller = null; 190 } 191 if(pollTimer != null) 192 { 193 pollTimer.cancel(); 194 pollTimer = null; 195 } 196 } 197 198 private synchronized void removeListeners() 199 { 200 Iterator iter = listeners.iterator(); 201 while(iter.hasNext()) 202 { 203 Object key = iter.next(); 204 ClientListener.remove(key); 205 iter.remove(); 206 } 207 } 208 209 private void addNotificationListener(Method m, Object args[], String sig[], Map payload) 210 throws Throwable 211 { 212 String methodName = m.getName(); 213 Object key = ClientListener.register(serverId, (ObjectName ) args[0], args[1], (NotificationFilter ) args[2], args[3]); 214 listeners.add(key); 215 Object a[] = new Object []{args[0], null, args[2], key}; 216 217 client.setSessionId(serverId); 219 client.invoke(new NameBasedInvocation(methodName, a, sig), 220 payload); 221 } 222 223 private void removeNotificationListener(Method m, Object args[], String sig[], Map payload) 224 throws Throwable 225 { 226 String methodName = m.getName(); 227 Object id = ClientListener.makeId(serverId, (ObjectName ) args[0], args[1]); 228 listeners.remove(id); 229 ClientListener cl = ClientListener.remove(id); 230 Object a[] = new Object []{args[0], null, id}; 231 if(cl != null) 232 { 233 client.setSessionId(serverId); 235 client.invoke(new NameBasedInvocation(methodName, 236 a, 237 new String []{ObjectName .class.getName(), 238 sig[1], 239 Integer .class.getName()}), 240 payload); 241 } 242 } 243 244 245 private boolean proxyEquals(Object proxy) 246 { 247 return (proxy.getClass() == this.getClass() && ((MBeanServerClientInvokerProxy) proxy).serverId.equals(serverId)); 248 } 249 250 private Integer proxyHashcode() 251 { 252 return new Integer (client.hashCode() + serverId.hashCode()); 253 } 254 255 public Object invoke(Object proxy, Method method, Object [] args) 256 throws Throwable 257 { 258 259 String methodName = method.getName(); 260 261 if(method.getDeclaringClass() == Object .class) 263 { 264 if(methodName.equals("equals")) 265 { 266 return new Boolean (proxyEquals(args[0])); 267 } 268 else if(methodName.equals("hashCode")) 269 { 270 return proxyHashcode(); 271 } 272 else if(methodName.equals("toString")) 273 { 274 return "MBeanServerClientInvokerProxy [serverid:" + serverId + ",locator:" + client.getInvoker().getLocator() + "]"; 275 } 276 } 277 278 String sig[] = getMethodSignature(method); 279 Map payload = new HashMap (1); 280 if(methodName.equals("addNotificationListener")) 281 { 282 addNotificationListener(method, args, sig, payload); 283 return null; 284 } 285 else if(methodName.equals("removeNotificationListener")) 286 { 287 removeNotificationListener(method, args, sig, payload); 288 return null; 289 } 290 Object value = null; 291 try 292 { 293 client.setSessionId(serverId); 295 value = client.invoke(new NameBasedInvocation(methodName, args, sig), 296 payload); 297 } 298 catch(Throwable throwable) 299 { 300 if(log.isTraceEnabled()) 301 { 302 log.trace("remote invocation failed for method: " + methodName + " to: " + serverId, throwable); 303 } 304 if(throwable instanceof ConnectionFailedException) 305 { 306 destroy(); 307 } 308 rethrowMBeanException(throwable); 309 } 310 finally 311 { 312 if(payload != null) 313 { 314 NotificationQueue queue = (NotificationQueue) payload.get("notifications"); 316 if(queue != null && queue.isEmpty() == false) 317 { 318 deliverNotifications(queue, false); 319 } 320 } 321 } 322 323 return value; 324 } 325 326 private void rethrowMBeanException(Throwable throwable) throws MBeanException , ReflectionException 327 { 328 if(throwable instanceof MBeanException ) 329 { 330 throw (MBeanException ) throwable; 331 } 332 else if(throwable instanceof ReflectionException ) 333 { 334 throw (ReflectionException ) throwable; 335 } 336 else 337 { 338 if(throwable instanceof UndeclaredThrowableException ) 339 { 340 UndeclaredThrowableException ut = (UndeclaredThrowableException ) throwable; 341 if(ut instanceof Exception ) 342 { 343 throw new MBeanException ((Exception ) ut.getUndeclaredThrowable(), ut.getMessage()); 344 } 345 else 346 { 347 throw new MBeanException (new Exception (ut.getUndeclaredThrowable().getMessage())); 348 } 349 } 350 else 351 { 352 if(throwable instanceof Exception ) 353 { 354 throw new MBeanException ((Exception ) throwable, throwable.getMessage()); 355 } 356 throw new MBeanException (new Exception (throwable.getMessage())); 357 } 358 } 359 } 360 361 public void deliverNotifications(NotificationQueue queue, boolean async) throws InterruptedException 362 { 363 if(async && poller != null) 364 { 365 cancelPoller(); 367 } 368 if(queue == null) 369 { 370 return; 371 } 372 373 Iterator iter = queue.iterator(); 374 while(iter.hasNext()) 375 { 376 final NotificationEntry entry = (NotificationEntry) iter.next(); 377 final ClientListener listener = ClientListener.get(entry.getHandBack()); 378 if(listener != null) 379 { 380 if(listener.listener instanceof NotificationListener ) 381 { 382 if(log.isTraceEnabled()) 383 { 384 log.trace("sending notification for entry: " + entry + " to: " + listener.listener); 385 } 386 try 387 { 388 ((NotificationListener ) listener.listener).handleNotification(entry.getNotification(), listener.handback); 389 } 390 catch(Throwable ex) 391 { 392 log.error("Error sending notification: " + entry.getNotification() + " to listener: " + listener); 393 } 394 } 395 else 396 { 397 log.error("called unimplemented addListener method", new Exception ()); 400 } 401 } 402 else 403 { 404 log.warn("couldn't find client listener for handback: " + entry.getHandBack() + "\nentry:" + entry + "\nqueue: " + queue + "\ndump: " + ClientListener.dump()); 405 } 406 } 407 } 408 409 public String [] getMethodSignature(Method method) 410 { 411 if(paramMap.containsKey(method)) 412 { 413 return (String []) paramMap.get(method); 414 } 415 Class paramTypes[] = method.getParameterTypes(); 416 String sig[] = (paramTypes == null) ? null : new String [paramTypes.length]; 417 if(paramTypes != null) 418 { 419 for(int c = 0; c < sig.length; c++) 420 { 421 sig[c] = paramTypes[c].getName(); 422 } 423 } 424 paramMap.put(method, sig); 425 return sig; 426 } 427 428 432 private final class NotificationPoller extends TimerTask 433 { 434 public void run() 435 { 436 try 437 { 438 if(client.isConnected() && ClientListener.hasListeners()) 441 { 442 Map payload = new HashMap (1); 443 client.setSessionId(serverId); 446 Boolean continuePolling = (Boolean ) client.invoke(new NameBasedInvocation("$GetNotifications$", 447 new Object []{}, 448 new String []{}), 449 payload); 450 NotificationQueue queue = (NotificationQueue) payload.get("notifications"); 451 if(queue != null && queue.isEmpty() == false) 452 { 453 deliverNotifications(queue, false); 455 } 456 if(continuePolling.booleanValue() == false) 457 { 458 cancelPoller(); 459 } 460 } 461 } 462 catch(ConnectionFailedException cnf) 463 { 464 destroy(); 466 } 467 catch(Throwable ex) 468 { 469 ex.printStackTrace(); 471 } 472 } 473 } 474 } 475 | Popular Tags |