1 7 8 package com.sun.jmx.remote.internal; 9 10 import java.io.IOException ; 11 import java.security.AccessController ; 12 import java.security.PrivilegedAction ; 13 import java.security.PrivilegedActionException ; 14 import java.security.PrivilegedExceptionAction ; 15 import java.util.ArrayList ; 16 import java.util.Collection ; 17 import java.util.HashSet ; 18 import java.util.Iterator ; 19 import java.util.List ; 20 import java.util.Set ; 21 import java.util.HashMap ; 22 import java.util.Map ; 23 24 import javax.management.InstanceNotFoundException ; 25 import javax.management.ListenerNotFoundException ; 26 import javax.management.MalformedObjectNameException ; 27 import javax.management.MBeanServer ; 28 import javax.management.MBeanServerNotification ; 29 import javax.management.Notification ; 30 import javax.management.NotificationBroadcaster ; 31 import javax.management.NotificationFilter ; 32 import javax.management.NotificationFilterSupport ; 33 import javax.management.NotificationListener ; 34 import javax.management.ObjectName ; 35 import javax.management.QueryEval ; 36 import javax.management.QueryExp ; 37 38 import javax.management.remote.NotificationResult ; 39 import javax.management.remote.TargetedNotification ; 40 41 import com.sun.jmx.remote.util.EnvHelp; 42 import com.sun.jmx.remote.util.ClassLogger; 43 44 45 public class ArrayNotificationBuffer implements NotificationBuffer { 46 47 private boolean disposed = false; 48 49 51 private static final 52 HashMap <MBeanServer ,ArrayNotificationBuffer> mbsToBuffer = 53 new HashMap <MBeanServer ,ArrayNotificationBuffer>(1); 54 private final Collection <ShareBuffer> sharers = new HashSet <ShareBuffer>(1); 55 56 public static synchronized NotificationBuffer 57 getNotificationBuffer(MBeanServer mbs, Map env) { 58 59 int queueSize = EnvHelp.getNotifBufferSize(env); 61 62 ArrayNotificationBuffer buf = mbsToBuffer.get(mbs); 63 if (buf == null) { 64 buf = new ArrayNotificationBuffer(mbs, queueSize); 65 mbsToBuffer.put(mbs, buf); 66 } 67 return buf.new ShareBuffer(queueSize); 68 } 69 70 public static synchronized void removeNotificationBuffer(MBeanServer mbs) { 71 mbsToBuffer.remove(mbs); 72 } 73 74 synchronized void addSharer(ShareBuffer sharer) { 75 if (sharer.getSize() > queueSize) 76 resize(sharer.getSize()); 77 sharers.add(sharer); 78 } 79 80 void removeSharer(ShareBuffer sharer) { 81 boolean empty; 82 synchronized (this) { 83 sharers.remove(sharer); 84 empty = sharers.isEmpty(); 85 if (!empty) { 86 int max = 0; 87 for (ShareBuffer buf : sharers) { 88 int bufsize = buf.getSize(); 89 if (bufsize > max) 90 max = bufsize; 91 } 92 if (max < queueSize) 93 resize(max); 94 } 95 } 96 if (empty) 97 dispose(); 98 } 99 100 private void resize(int newSize) { 101 if (newSize == queueSize) 102 return; 103 while (queue.size() > newSize) 104 dropNotification(); 105 queue.resize(newSize); 106 queueSize = newSize; 107 } 108 109 private class ShareBuffer implements NotificationBuffer { 110 ShareBuffer(int size) { 111 this.size = size; 112 addSharer(this); 113 } 114 115 public NotificationResult 116 fetchNotifications(Set <ListenerInfo> listeners, 117 long startSequenceNumber, 118 long timeout, 119 int maxNotifications) 120 throws InterruptedException { 121 NotificationBuffer buf = ArrayNotificationBuffer.this; 122 return buf.fetchNotifications(listeners, startSequenceNumber, 123 timeout, maxNotifications); 124 } 125 126 public void dispose() { 127 ArrayNotificationBuffer.this.removeSharer(this); 128 } 129 130 int getSize() { 131 return size; 132 } 133 134 private final int size; 135 } 136 137 138 140 private ArrayNotificationBuffer(MBeanServer mbs, int queueSize) { 141 if (logger.traceOn()) 142 logger.trace("Constructor", "queueSize=" + queueSize); 143 144 if (mbs == null || queueSize < 1) 145 throw new IllegalArgumentException ("Bad args"); 146 147 this.mBeanServer = mbs; 148 this.queueSize = queueSize; 149 this.queue = new ArrayQueue<NamedNotification>(queueSize); 150 this.earliestSequenceNumber = System.currentTimeMillis(); 151 this.nextSequenceNumber = this.earliestSequenceNumber; 152 153 createListeners(); 154 155 logger.trace("Constructor", "ends"); 156 } 157 158 private synchronized boolean isDisposed() { 159 return disposed; 160 } 161 162 public void dispose() { 163 logger.trace("dispose", "starts"); 164 165 synchronized(this) { 166 removeNotificationBuffer(mBeanServer); 167 disposed = true; 168 notifyAll(); 170 } 171 172 destroyListeners(); 173 174 logger.trace("dispose", "ends"); 175 } 176 177 208 public NotificationResult 209 fetchNotifications(Set <ListenerInfo> listeners, 210 long startSequenceNumber, 211 long timeout, 212 int maxNotifications) 213 throws InterruptedException { 214 215 logger.trace("fetchNotifications", "starts"); 216 217 if (startSequenceNumber < 0 || isDisposed()) { 218 synchronized(this) { 219 return new NotificationResult (earliestSequenceNumber(), 220 nextSequenceNumber(), 221 new TargetedNotification [0]); 222 } 223 } 224 225 if (listeners == null 227 || startSequenceNumber < 0 || timeout < 0 228 || maxNotifications < 0) { 229 logger.trace("fetchNotifications", "Bad args"); 230 throw new IllegalArgumentException ("Bad args to fetch"); 231 } 232 233 if (logger.debugOn()) { 234 logger.trace("fetchNotifications", 235 "listener-length=" + listeners.size() + "; startSeq=" + 236 startSequenceNumber + "; timeout=" + timeout + 237 "; max=" + maxNotifications); 238 } 239 240 if (startSequenceNumber > nextSequenceNumber()) { 241 final String msg = "Start sequence number too big: " + 242 startSequenceNumber + " > " + nextSequenceNumber(); 243 logger.trace("fetchNotifications", msg); 244 throw new IllegalArgumentException (msg); 245 } 246 247 252 long endTime = System.currentTimeMillis() + timeout; 253 if (endTime < 0) endTime = Long.MAX_VALUE; 255 256 if (logger.debugOn()) 257 logger.debug("fetchNotifications", "endTime=" + endTime); 258 259 263 long earliestSeq = -1; 264 long nextSeq = startSequenceNumber; 265 List <TargetedNotification > notifs = 266 new ArrayList <TargetedNotification >(); 267 268 270 while (true) { 271 logger.debug("fetchNotifications", "main loop starts"); 272 273 NamedNotification candidate; 274 275 277 synchronized (this) { 278 279 281 if (earliestSeq < 0) { 282 earliestSeq = earliestSequenceNumber(); 283 if (logger.debugOn()) { 284 logger.debug("fetchNotifications", 285 "earliestSeq=" + earliestSeq); 286 } 287 if (nextSeq < earliestSeq) { 288 nextSeq = earliestSeq; 289 logger.debug("fetchNotifications", 290 "nextSeq=earliestSeq"); 291 } 292 } else 293 earliestSeq = earliestSequenceNumber(); 294 295 300 if (nextSeq < earliestSeq) { 301 logger.trace("fetchNotifications", 302 "nextSeq=" + nextSeq + " < " + "earliestSeq=" + 303 earliestSeq + " so may have lost notifs"); 304 break; 305 } 306 307 if (nextSeq < nextSequenceNumber()) { 308 candidate = notificationAt(nextSeq); 309 if (logger.debugOn()) { 310 logger.debug("fetchNotifications", "candidate: " + 311 candidate); 312 logger.debug("fetchNotifications", "nextSeq now " + 313 nextSeq); 314 } 315 } else { 316 320 if (notifs.size() > 0) { 321 logger.debug("fetchNotifications", 322 "no more notifs but have some so don't wait"); 323 break; 324 } 325 long toWait = endTime - System.currentTimeMillis(); 326 if (toWait <= 0) { 327 logger.debug("fetchNotifications", "timeout"); 328 break; 329 } 330 331 332 if (isDisposed()) { 333 if (logger.debugOn()) 334 logger.debug("fetchNotifications", 335 "dispose callled, no wait"); 336 return new NotificationResult (earliestSequenceNumber(), 337 nextSequenceNumber(), 338 new TargetedNotification [0]); 339 } 340 341 if (logger.debugOn()) 342 logger.debug("fetchNotifications", 343 "wait(" + toWait + ")"); 344 wait(toWait); 345 346 continue; 347 } 348 } 349 350 355 ObjectName name = candidate.getObjectName(); 356 Notification notif = candidate.getNotification(); 357 List <TargetedNotification > matchedNotifs = 358 new ArrayList <TargetedNotification >(); 359 logger.debug("fetchNotifications", 360 "applying filters to candidate"); 361 synchronized (listeners) { 362 for (ListenerInfo li : listeners) { 363 ObjectName pattern = li.getObjectName(); 364 NotificationFilter filter = li.getNotificationFilter(); 365 366 if (logger.debugOn()) { 367 logger.debug("fetchNotifications", 368 "pattern=<" + pattern + ">; filter=" + filter); 369 } 370 371 if (pattern.apply(name)) { 372 logger.debug("fetchNotifications", "pattern matches"); 373 if (filter == null 374 || filter.isNotificationEnabled(notif)) { 375 logger.debug("fetchNotifications", 376 "filter matches"); 377 Integer listenerID = li.getListenerID(); 378 TargetedNotification tn = 379 new TargetedNotification (notif, listenerID); 380 matchedNotifs.add(tn); 381 } 382 } 383 } 384 } 385 386 if (matchedNotifs.size() > 0) { 387 392 if (maxNotifications <= 0) { 393 logger.debug("fetchNotifications", 394 "reached maxNotifications"); 395 break; 396 } 397 --maxNotifications; 398 if (logger.debugOn()) 399 logger.debug("fetchNotifications", "add: " + 400 matchedNotifs); 401 notifs.addAll(matchedNotifs); 402 } 403 404 ++nextSeq; 405 } 407 408 int nnotifs = notifs.size(); 409 TargetedNotification [] resultNotifs = 410 new TargetedNotification [nnotifs]; 411 notifs.toArray(resultNotifs); 412 NotificationResult nr = 413 new NotificationResult (earliestSeq, nextSeq, resultNotifs); 414 if (logger.debugOn()) 415 logger.debug("fetchNotifications", nr.toString()); 416 logger.trace("fetchNotifications", "ends"); 417 418 return nr; 419 } 420 421 synchronized long earliestSequenceNumber() { 422 return earliestSequenceNumber; 423 } 424 425 synchronized long nextSequenceNumber() { 426 return nextSequenceNumber; 427 } 428 429 synchronized void addNotification(NamedNotification notif) { 430 if (logger.traceOn()) 431 logger.trace("addNotification", notif.toString()); 432 433 while (queue.size() >= queueSize) { 434 dropNotification(); 435 if (logger.debugOn()) { 436 logger.debug("addNotification", 437 "dropped oldest notif, earliestSeq=" + 438 earliestSequenceNumber); 439 } 440 } 441 queue.add(notif); 442 nextSequenceNumber++; 443 if (logger.debugOn()) 444 logger.debug("addNotification", "nextSeq=" + nextSequenceNumber); 445 notifyAll(); 446 } 447 448 private void dropNotification() { 449 queue.remove(0); 450 earliestSequenceNumber++; 451 } 452 453 synchronized NamedNotification notificationAt(long seqNo) { 454 long index = seqNo - earliestSequenceNumber; 455 if (index < 0 || index > Integer.MAX_VALUE) { 456 final String msg = "Bad sequence number: " + seqNo + " (earliest " 457 + earliestSequenceNumber + ")"; 458 logger.trace("notificationAt", msg); 459 throw new IllegalArgumentException (msg); 460 } 461 return queue.get((int) index); 462 } 463 464 private static class NamedNotification { 465 NamedNotification(ObjectName sender, Notification notif) { 466 this.sender = sender; 467 this.notification = notif; 468 } 469 470 ObjectName getObjectName() { 471 return sender; 472 } 473 474 Notification getNotification() { 475 return notification; 476 } 477 478 public String toString() { 479 return "NamedNotification(" + sender + ", " + notification + ")"; 480 } 481 482 private final ObjectName sender; 483 private final Notification notification; 484 } 485 486 518 private void createListeners() { 519 logger.debug("createListeners", "starts"); 520 521 synchronized (this) { 522 createdDuringQuery = new HashSet <ObjectName >(); 523 } 524 525 try { 526 addNotificationListener(delegateName, 527 creationListener, creationFilter, null); 528 logger.debug("createListeners", "added creationListener"); 529 } catch (Exception e) { 530 final String msg = "Can't add listener to MBean server delegate: "; 531 RuntimeException re = new IllegalArgumentException (msg + e); 532 EnvHelp.initCause(re, e); 533 logger.fine("createListeners", msg + e); 534 logger.debug("createListeners", e); 535 throw re; 536 } 537 538 540 Set <ObjectName > names = queryNames(null, broadcasterQuery); 541 names = new HashSet <ObjectName >(names); 542 543 synchronized (this) { 544 names.addAll(createdDuringQuery); 545 createdDuringQuery = null; 546 } 547 548 for (ObjectName name : names) 549 addBufferListener(name); 550 logger.debug("createListeners", "ends"); 551 } 552 553 private void addBufferListener(ObjectName name) { 554 if (logger.debugOn()) 555 logger.debug("addBufferListener", name.toString()); 556 try { 557 addNotificationListener(name, bufferListener, null, name); 558 } catch (Exception e) { 559 logger.trace("addBufferListener", e); 560 563 } 564 } 565 566 private void removeBufferListener(ObjectName name) { 567 if (logger.debugOn()) 568 logger.debug("removeBufferListener", name.toString()); 569 try { 570 removeNotificationListener(name, bufferListener); 571 } catch (Exception e) { 572 logger.trace("removeBufferListener", e); 573 } 574 } 575 576 private void addNotificationListener(final ObjectName name, 577 final NotificationListener listener, 578 final NotificationFilter filter, 579 final Object handback) 580 throws Exception { 581 try { 582 AccessController.doPrivileged(new PrivilegedExceptionAction () { 583 public Object run() throws InstanceNotFoundException { 584 mBeanServer.addNotificationListener(name, 585 listener, 586 filter, 587 handback); 588 return null; 589 } 590 }); 591 } catch (Exception e) { 592 throw extractException(e); 593 } 594 } 595 596 private void removeNotificationListener(final ObjectName name, 597 final NotificationListener listener) 598 throws Exception { 599 try { 600 AccessController.doPrivileged(new PrivilegedExceptionAction () { 601 public Object run() throws Exception { 602 mBeanServer.removeNotificationListener(name, listener); 603 return null; 604 } 605 }); 606 } catch (Exception e) { 607 throw extractException(e); 608 } 609 } 610 611 private Set <ObjectName > queryNames(final ObjectName name, 612 final QueryExp query) { 613 PrivilegedAction <Set <ObjectName >> act = 614 new PrivilegedAction <Set <ObjectName >>() { 615 public Set <ObjectName > run() { 616 return mBeanServer.queryNames(name, query); 617 } 618 }; 619 try { 620 return AccessController.doPrivileged(act); 621 } catch (RuntimeException e) { 622 logger.fine("queryNames", "Failed to query names: " + e); 623 logger.debug("queryNames", e); 624 throw e; 625 } 626 } 627 628 private static boolean isInstanceOf(final MBeanServer mbs, 629 final ObjectName name, 630 final String className) { 631 PrivilegedExceptionAction <Boolean > act = 632 new PrivilegedExceptionAction <Boolean >() { 633 public Boolean run() throws InstanceNotFoundException { 634 return mbs.isInstanceOf(name, className); 635 } 636 }; 637 try { 638 return AccessController.doPrivileged(act); 639 } catch (Exception e) { 640 logger.fine("isInstanceOf", "failed: " + e); 641 logger.debug("isInstanceOf", e); 642 return false; 643 } 644 } 645 646 654 private void createdNotification(MBeanServerNotification n) { 655 final String shouldEqual = 656 MBeanServerNotification.REGISTRATION_NOTIFICATION; 657 if (!n.getType().equals(shouldEqual)) { 658 logger.warning("createNotification", "bad type: " + n.getType()); 659 return; 660 } 661 662 ObjectName name = n.getMBeanName(); 663 if (logger.debugOn()) 664 logger.debug("createdNotification", "for: " + name); 665 666 synchronized (this) { 667 if (createdDuringQuery != null) { 668 createdDuringQuery.add(name); 669 return; 670 } 671 } 672 673 if (isInstanceOf(mBeanServer, name, broadcasterClass)) { 674 addBufferListener(name); 675 if (isDisposed()) 676 removeBufferListener(name); 677 } 678 } 679 680 private class BufferListener implements NotificationListener { 681 public void handleNotification(Notification notif, Object handback) { 682 if (logger.debugOn()) { 683 logger.debug("BufferListener.handleNotification", 684 "notif=" + notif + "; handback=" + handback); 685 } 686 ObjectName name = (ObjectName ) handback; 687 addNotification(new NamedNotification(name, notif)); 688 } 689 } 690 691 private final NotificationListener bufferListener = new BufferListener(); 692 693 private static class BroadcasterQuery 694 extends QueryEval implements QueryExp { 695 public boolean apply(final ObjectName name) { 696 final MBeanServer mbs = QueryEval.getMBeanServer(); 697 return isInstanceOf(mbs, name, broadcasterClass); 698 } 699 } 700 private static final QueryExp broadcasterQuery = new BroadcasterQuery(); 701 702 private static final NotificationFilter creationFilter; 703 static { 704 NotificationFilterSupport nfs = new NotificationFilterSupport (); 705 nfs.enableType(MBeanServerNotification.REGISTRATION_NOTIFICATION); 706 creationFilter = nfs; 707 } 708 709 private final NotificationListener creationListener = 710 new NotificationListener () { 711 public void handleNotification(Notification notif, 712 Object handback) { 713 logger.debug("creationListener", "handleNotification called"); 714 createdNotification((MBeanServerNotification ) notif); 715 } 716 }; 717 718 private void destroyListeners() { 719 logger.debug("destroyListeners", "starts"); 720 try { 721 removeNotificationListener(delegateName, 722 creationListener); 723 } catch (Exception e) { 724 logger.warning("remove listener from MBeanServer delegate", e); 725 } 726 Set <ObjectName > names = queryNames(null, broadcasterQuery); 727 for (final ObjectName name : names) { 728 if (logger.debugOn()) 729 logger.debug("destroyListeners", 730 "remove listener from " + name); 731 removeBufferListener(name); 732 } 733 logger.debug("destroyListeners", "ends"); 734 } 735 736 740 private static Exception extractException(Exception e) { 741 while (e instanceof PrivilegedActionException ) { 742 e = ((PrivilegedActionException )e).getException(); 743 } 744 return e; 745 } 746 747 private static final ClassLogger logger = 748 new ClassLogger("javax.management.remote.misc", 749 "ArrayNotificationBuffer"); 750 751 private static final ObjectName delegateName; 752 static { 753 try { 754 delegateName = 755 ObjectName.getInstance("JMImplementation:" + 756 "type=MBeanServerDelegate"); 757 } catch (MalformedObjectNameException e) { 758 RuntimeException re = 759 new RuntimeException ("Can't create delegate name: " + e); 760 EnvHelp.initCause(re, e); 761 logger.error("<init>", "Can't create delegate name: " + e); 762 logger.debug("<init>",e); 763 throw re; 764 } 765 } 766 767 private final MBeanServer mBeanServer; 768 private final ArrayQueue<NamedNotification> queue; 769 private int queueSize; 770 private long earliestSequenceNumber; 771 private long nextSequenceNumber; 772 private Set <ObjectName > createdDuringQuery; 773 774 static final String broadcasterClass = 775 NotificationBroadcaster .class.getName(); 776 } 777 | Popular Tags |