1 7 8 package com.sun.jmx.remote.internal; 9 10 import java.io.IOException ; 11 import java.io.NotSerializableException ; 12 13 import java.util.ArrayList ; 14 import java.util.HashMap ; 15 import java.util.Map ; 16 import java.util.concurrent.Executor ; 17 18 import java.security.AccessController ; 19 import java.security.PrivilegedAction ; 20 import javax.security.auth.Subject ; 21 22 import javax.management.Notification ; 23 import javax.management.NotificationListener ; 24 import javax.management.NotificationFilter ; 25 import javax.management.ObjectName ; 26 import javax.management.MBeanServerNotification ; 27 import javax.management.InstanceNotFoundException ; 28 import javax.management.ListenerNotFoundException ; 29 30 import javax.management.remote.NotificationResult ; 31 import javax.management.remote.TargetedNotification ; 32 33 import com.sun.jmx.remote.util.ClassLogger; 34 import com.sun.jmx.remote.util.EnvHelp; 35 36 37 public abstract class ClientNotifForwarder { 38 public ClientNotifForwarder(Map env) { 39 this(null, env); 40 } 41 42 private static int threadId; 43 44 64 private static class LinearExecutor implements Executor { 65 public synchronized void execute(Runnable command) { 66 if (this.command != null) 67 throw new IllegalArgumentException ("More than one command"); 68 this.command = command; 69 if (thread == null) { 70 thread = new Thread () { 71 public void run() { 72 while (true) { 73 Runnable r; 74 synchronized (LinearExecutor.this) { 75 if (LinearExecutor.this.command == null) { 76 thread = null; 77 return; 78 } else { 79 r = LinearExecutor.this.command; 80 LinearExecutor.this.command = null; 81 } 82 } 83 r.run(); 84 } 85 } 86 }; 87 thread.setDaemon(true); 88 thread.setName("ClientNotifForwarder-" + ++threadId); 89 thread.start(); 90 } 91 } 92 93 private Runnable command; 94 private Thread thread; 95 } 96 97 public ClientNotifForwarder(ClassLoader defaultClassLoader, Map env) { 98 maxNotifications = EnvHelp.getMaxFetchNotifNumber(env); 99 timeout = EnvHelp.getFetchTimeout(env); 100 101 105 Executor ex = (Executor ) 106 env.get("jmx.remote.x.fetch.notifications.executor"); 107 if (ex == null) 108 ex = new LinearExecutor(); 109 else if (logger.traceOn()) 110 logger.trace("ClientNotifForwarder", "executor is " + ex); 111 112 this.defaultClassLoader = defaultClassLoader; 113 this.executor = ex; 114 } 115 116 119 abstract protected NotificationResult fetchNotifs(long clientSequenceNumber, 120 int maxNotifications, 121 long timeout) 122 throws IOException , ClassNotFoundException ; 123 124 abstract protected Integer addListenerForMBeanRemovedNotif() 125 throws IOException , InstanceNotFoundException ; 126 127 abstract protected void removeListenerForMBeanRemovedNotif(Integer id) 128 throws IOException , InstanceNotFoundException , 129 ListenerNotFoundException ; 130 131 134 abstract protected void lostNotifs(String message, long number); 135 136 137 public synchronized void addNotificationListener(Integer listenerID, 138 ObjectName name, 139 NotificationListener listener, 140 NotificationFilter filter, 141 Object handback, 142 Subject delegationSubject) 143 throws IOException , InstanceNotFoundException { 144 145 if (logger.traceOn()) { 146 logger.trace("addNotificationListener", 147 "Add the listener "+listener+" at "+name); 148 } 149 150 infoList.put(listenerID, 151 new ClientListenerInfo(listenerID, 152 name, 153 listener, 154 filter, 155 handback, 156 delegationSubject)); 157 158 159 init(false); 160 } 161 162 public synchronized Integer [] 163 removeNotificationListener(ObjectName name, 164 NotificationListener listener) 165 throws ListenerNotFoundException , IOException { 166 167 beforeRemove(); 168 169 if (logger.traceOn()) { 170 logger.trace("removeNotificationListener", 171 "Remove the listener "+listener+" from "+name); 172 } 173 174 ArrayList ids = new ArrayList (); 175 ArrayList values = new ArrayList (infoList.values()); 176 for (int i=values.size()-1; i>=0; i--) { 177 ClientListenerInfo li = (ClientListenerInfo)values.get(i); 178 179 if (li.sameAs(name, listener)) { 180 ids.add(li.getListenerID()); 181 182 infoList.remove(li.getListenerID()); 183 } 184 } 185 186 if (ids.isEmpty()) 187 throw new ListenerNotFoundException ("Listener not found"); 188 189 return (Integer [])ids.toArray(new Integer [0]); 190 } 191 192 public synchronized Integer 193 removeNotificationListener(ObjectName name, 194 NotificationListener listener, 195 NotificationFilter filter, 196 Object handback) 197 throws ListenerNotFoundException , IOException { 198 199 if (logger.traceOn()) { 200 logger.trace("removeNotificationListener", 201 "Remove the listener "+listener+" from "+name); 202 } 203 204 beforeRemove(); 205 206 Integer id = null; 207 208 ArrayList values = new ArrayList (infoList.values()); 209 for (int i=values.size()-1; i>=0; i--) { 210 ClientListenerInfo li = (ClientListenerInfo)values.get(i); 211 if (li.sameAs(name, listener, filter, handback)) { 212 id=li.getListenerID(); 213 214 infoList.remove(id); 215 216 break; 217 } 218 } 219 220 if (id == null) 221 throw new ListenerNotFoundException ("Listener not found"); 222 223 return id; 224 } 225 226 public synchronized Integer [] removeNotificationListener(ObjectName name) { 227 if (logger.traceOn()) { 228 logger.trace("removeNotificationListener", 229 "Remove all listeners registered at "+name); 230 } 231 232 ArrayList ids = new ArrayList (); 233 234 ArrayList values = new ArrayList (infoList.values()); 235 for (int i=values.size()-1; i>=0; i--) { 236 ClientListenerInfo li = (ClientListenerInfo)values.get(i); 237 if (li.sameAs(name)) { 238 ids.add(li.getListenerID()); 239 240 infoList.remove(li.getListenerID()); 241 } 242 } 243 244 return (Integer []) ids.toArray(new Integer [0]); 245 } 246 247 public synchronized ListenerInfo[] getListenerInfo() { 248 return (ListenerInfo[])infoList.values().toArray(new ListenerInfo[0]); 249 } 250 251 262 public synchronized ClientListenerInfo[] preReconnection() throws IOException { 263 if (state == TERMINATED || beingReconnected) { throw new IOException ("Illegal state."); 265 } 266 267 final ClientListenerInfo[] tmp = (ClientListenerInfo[]) 268 infoList.values().toArray(new ClientListenerInfo[0]); 269 270 271 beingReconnected = true; 272 273 infoList.clear(); 274 275 if (currentFetchThread == Thread.currentThread()) { 276 279 return tmp; 280 } 281 282 while (state == STARTING) { 283 try { 284 wait(); 285 } catch (InterruptedException ire) { 286 IOException ioe = new IOException (ire.toString()); 287 EnvHelp.initCause(ioe, ire); 288 289 throw ioe; 290 } 291 } 292 293 if (state == STARTED) { 294 setState(STOPPING); 295 } 296 297 return tmp; 298 } 299 300 305 public synchronized void postReconnection(ClientListenerInfo[] listenerInfos) 306 throws IOException { 307 308 if (state == TERMINATED) { 309 return; 310 } 311 312 while (state == STOPPING) { 313 try { 314 wait(); 315 } catch (InterruptedException ire) { 316 IOException ioe = new IOException (ire.toString()); 317 EnvHelp.initCause(ioe, ire); 318 throw ioe; 319 } 320 } 321 322 final boolean trace = logger.traceOn(); 323 final int len = listenerInfos.length; 324 325 for (int i=0; i<len; i++) { 326 if (trace) { 327 logger.trace("addNotificationListeners", 328 "Add a listener at "+ 329 listenerInfos[i].getListenerID()); 330 } 331 332 infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]); 333 } 334 335 beingReconnected = false; 336 notifyAll(); 337 338 if (currentFetchThread == Thread.currentThread()) { 339 try { 341 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); 342 } catch (Exception e) { 343 final String msg = 344 "Failed to register a listener to the mbean " + 345 "server: the client will not do clean when an MBean " + 346 "is unregistered"; 347 if (logger.traceOn()) { 348 logger.trace("init", msg, e); 349 } 350 } 351 } else if (listenerInfos.length > 0) { init(true); 353 } else if (infoList.size() > 0) { 354 init(false); 356 } 357 } 358 359 public synchronized void terminate() { 360 if (state == TERMINATED) { 361 return; 362 } 363 364 if (logger.traceOn()) { 365 logger.trace("terminate", "Terminating..."); 366 } 367 368 if (state == STARTED) { 369 infoList.clear(); 370 } 371 372 setState(TERMINATED); 373 } 374 375 private class NotifFetcher implements Runnable { 380 public void run() { 381 synchronized (ClientNotifForwarder.this) { 382 currentFetchThread = Thread.currentThread(); 383 384 if (state == STARTING) 385 setState(STARTED); 386 } 387 388 if (defaultClassLoader != null) { 389 AccessController.doPrivileged(new PrivilegedAction () { 390 public Object run() { 391 Thread.currentThread(). 392 setContextClassLoader(defaultClassLoader); 393 return null; 394 } 395 }); 396 } 397 398 NotificationResult nr = null; 399 if (!shouldStop() && (nr = fetchNotifs()) != null) { 400 402 final TargetedNotification [] notifs = 403 nr.getTargetedNotifications(); 404 final int len = notifs.length; 405 final HashMap listeners; 406 final Integer myListenerID; 407 408 long missed = 0; 409 410 synchronized(ClientNotifForwarder.this) { 411 if (clientSequenceNumber >= 0) { 414 missed = nr.getEarliestSequenceNumber() - 415 clientSequenceNumber; 416 } 417 418 clientSequenceNumber = nr.getNextSequenceNumber(); 419 420 final int size = infoList.size(); 421 listeners = new HashMap (((size>len)?len:size)); 422 423 for (int i = 0 ; i < len ; i++) { 424 final TargetedNotification tn = notifs[i]; 425 final Integer listenerID = tn.getListenerID(); 426 427 if (!listenerID.equals(mbeanRemovedNotifID)) { 429 final ListenerInfo li = 430 (ListenerInfo) infoList.get(listenerID); 431 if (li != null) 432 listeners.put(listenerID,li); 433 continue; 434 } 435 final Notification notif = tn.getNotification(); 436 final String unreg = 437 MBeanServerNotification.UNREGISTRATION_NOTIFICATION; 438 if (notif instanceof MBeanServerNotification && 439 notif.getType().equals(unreg)) { 440 441 MBeanServerNotification mbsn = 442 (MBeanServerNotification ) notif; 443 ObjectName name = mbsn.getMBeanName(); 444 445 removeNotificationListener(name); 446 } 447 } 448 myListenerID = mbeanRemovedNotifID; 449 } 450 451 if (missed > 0) { 452 final String msg = 453 "May have lost up to " + missed + 454 " notification" + (missed == 1 ? "" : "s"); 455 lostNotifs(msg, missed); 456 logger.trace("NotifFetcher.run", msg); 457 } 458 459 for (int i = 0 ; i < len ; i++) { 461 final TargetedNotification tn = notifs[i]; 462 dispatchNotification(tn,myListenerID,listeners); 463 } 464 } 465 466 synchronized (ClientNotifForwarder.this) { 467 currentFetchThread = null; 468 } 469 470 if (nr == null || shouldStop()) { 471 setState(STOPPED); 473 } else { 474 executor.execute(this); 475 } 476 } 477 478 void dispatchNotification(TargetedNotification tn, 479 Integer myListenerID, Map listeners) { 480 final Notification notif = tn.getNotification(); 481 final Integer listenerID = tn.getListenerID(); 482 483 if (listenerID.equals(myListenerID)) return; 484 final ListenerInfo li = (ClientListenerInfo) 485 listeners.get(listenerID); 486 487 if (li == null) { 488 logger.trace("NotifFetcher.dispatch", 489 "Listener ID not in map"); 490 return; 491 } 492 493 NotificationListener l = li.getListener(); 494 Object h = li.getHandback(); 495 try { 496 l.handleNotification(notif, h); 497 } catch (RuntimeException e) { 498 final String msg = 499 "Failed to forward a notification " + 500 "to a listener"; 501 logger.trace("NotifFetcher-run", msg, e); 502 } 503 504 } 505 506 private NotificationResult fetchNotifs() { 507 try { 508 NotificationResult nr = ClientNotifForwarder.this. 509 fetchNotifs(clientSequenceNumber,maxNotifications, 510 timeout); 511 512 if (logger.traceOn()) { 513 logger.trace("NotifFetcher-run", 514 "Got notifications from the server: "+nr); 515 } 516 517 return nr; 518 } catch (ClassNotFoundException e) { 519 logger.trace("NotifFetcher.fetchNotifs", e); 520 return fetchOneNotif(); 521 } catch (NotSerializableException e) { 522 logger.trace("NotifFetcher.fetchNotifs", e); 523 return fetchOneNotif(); 524 } catch (IOException ioe) { 525 if (!shouldStop()) { 526 logger.error("NotifFetcher-run", 527 "Failed to fetch notification, " + 528 "stopping thread. Error is: " + ioe, ioe); 529 logger.debug("NotifFetcher-run",ioe); 530 } 531 532 return null; 534 } 535 } 536 537 555 private NotificationResult fetchOneNotif() { 556 ClientNotifForwarder cnf = ClientNotifForwarder.this; 557 558 long startSequenceNumber = clientSequenceNumber; 559 560 int notFoundCount = 0; 561 562 NotificationResult result = null; 563 564 while (result == null && !shouldStop()) { 565 NotificationResult nr; 566 567 try { 568 nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L); 570 } catch (ClassNotFoundException e) { 571 logger.warning("NotifFetcher.fetchOneNotif", 572 "Impossible exception: " + e); 573 logger.debug("NotifFetcher.fetchOneNotif",e); 574 return null; 575 } catch (IOException e) { 576 if (!shouldStop()) 577 logger.trace("NotifFetcher.fetchOneNotif", e); 578 return null; 579 } 580 581 if (shouldStop()) 582 return null; 583 584 startSequenceNumber = nr.getNextSequenceNumber(); 585 586 try { 587 result = cnf.fetchNotifs(startSequenceNumber, 1, 0L); 589 } catch (Exception e) { 590 if (e instanceof ClassNotFoundException 591 || e instanceof NotSerializableException ) { 592 logger.warning("NotifFetcher.fetchOneNotif", 593 "Failed to deserialize a notification: "+e.toString()); 594 if (logger.traceOn()) { 595 logger.trace("NotifFetcher.fetchOneNotif", 596 "Failed to deserialize a notification.", e); 597 } 598 599 notFoundCount++; 600 startSequenceNumber++; 601 } else { 602 if (!shouldStop()) 603 logger.trace("NotifFetcher.fetchOneNotif", e); 604 return null; 605 } 606 } 607 } 608 609 if (notFoundCount > 0) { 610 final String msg = 611 "Dropped " + notFoundCount + " notification" + 612 (notFoundCount == 1 ? "" : "s") + 613 " because classes were missing locally"; 614 lostNotifs(msg, notFoundCount); 615 } 616 617 return result; 618 } 619 620 private boolean shouldStop() { 621 synchronized (ClientNotifForwarder.this) { 622 if (state != STARTED) { 623 return true; 624 } else if (infoList.size() == 0) { 625 setState(STOPPING); 627 628 return true; 629 } 630 631 return false; 632 } 633 } 634 } 635 636 637 private synchronized void setState(int newState) { 641 if (state == TERMINATED) { 642 return; 643 } 644 645 state = newState; 646 this.notifyAll(); 647 } 648 649 657 private synchronized void init(boolean reconnected) throws IOException { 658 switch (state) { 659 case STARTED: 660 return; 661 case STARTING: 662 return; 663 case TERMINATED: 664 throw new IOException ("The ClientNotifForwarder has been terminated."); 665 case STOPPING: 666 if (beingReconnected == true) { 667 return; 669 } 670 671 while (state == STOPPING) { try { 673 wait(); 674 } catch (InterruptedException ire) { 675 IOException ioe = new IOException (ire.toString()); 676 EnvHelp.initCause(ioe, ire); 677 678 throw ioe; 679 } 680 } 681 682 init(reconnected); 685 686 return; 687 case STOPPED: 688 if (beingReconnected == true) { 689 return; 691 } 692 693 if (logger.traceOn()) { 694 logger.trace("init", "Initializing..."); 695 } 696 697 if (!reconnected) { 699 try { 700 NotificationResult nr = fetchNotifs(-1, 0, 0); 701 clientSequenceNumber = nr.getNextSequenceNumber(); 702 } catch (ClassNotFoundException e) { 703 logger.warning("init", "Impossible exception: "+ e); 705 logger.debug("init",e); 706 } 707 } 708 709 try { 711 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); 712 } catch (Exception e) { 713 final String msg = 714 "Failed to register a listener to the mbean " + 715 "server: the client will not do clean when an MBean " + 716 "is unregistered"; 717 if (logger.traceOn()) { 718 logger.trace("init", msg, e); 719 } 720 } 721 722 setState(STARTING); 723 724 executor.execute(new NotifFetcher()); 726 727 return; 728 default: 729 throw new IOException ("Unknown state."); 731 } 732 } 733 734 738 private synchronized void beforeRemove() throws IOException { 739 while (beingReconnected) { 740 if (state == TERMINATED) { 741 throw new IOException ("Terminated."); 742 } 743 744 try { 745 wait(); 746 } catch (InterruptedException ire) { 747 IOException ioe = new IOException (ire.toString()); 748 EnvHelp.initCause(ioe, ire); 749 750 throw ioe; 751 } 752 } 753 754 if (state == TERMINATED) { 755 throw new IOException ("Terminated."); 756 } 757 } 758 759 763 private final ClassLoader defaultClassLoader; 764 private final Executor executor; 765 766 private final HashMap infoList = new HashMap (); 767 769 private long clientSequenceNumber = -1; 771 private final int maxNotifications; 772 private final long timeout; 773 774 private Integer mbeanRemovedNotifID = null; 775 776 private Thread currentFetchThread; 777 778 private boolean inited = false; 780 781 785 private static final int STARTING = 0; 786 787 790 private static final int STARTED = 1; 791 792 795 private static final int STOPPING = 2; 796 797 800 private static final int STOPPED = 3; 801 802 806 private static final int TERMINATED = 4; 807 808 private int state = STOPPED; 809 810 817 private boolean beingReconnected = false; 818 819 private static final ClassLogger logger = 820 new ClassLogger("javax.management.remote.misc", 821 "ClientNotifForwarder"); 822 } 823 | Popular Tags |