1 package org.jboss.mx.remoting.rmi; 2 3 import java.util.ArrayList ; 4 import java.util.HashMap ; 5 import java.util.List ; 6 import java.util.Map ; 7 import java.util.Collection ; 8 import javax.management.Notification ; 9 import javax.management.NotificationFilter ; 10 import javax.management.NotificationListener ; 11 import javax.management.ObjectName ; 12 import javax.management.remote.NotificationResult ; 13 import javax.management.remote.TargetedNotification ; 14 import org.jboss.logging.Logger; 15 16 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 17 18 21 public class ClientNotificationProxy implements NotificationListener 22 { 23 private SynchronizedInt idCounter = new SynchronizedInt(0); 24 private Map listenerHolders = new HashMap (); 25 private List clientListenerNotifications = new ArrayList (); 26 27 private long startSequence = 0; 28 private long currentSequence = 0; 29 30 public static final int DEFAULT_MAX_NOTIFICATION_BUFFER_SIZE = 1024; 31 public static final String MAX_NOTIFICATION_BUFFER_SIZE_KEY = "jmx.remote.notification.buffer.size"; 32 33 private int maxNumberOfNotifications = DEFAULT_MAX_NOTIFICATION_BUFFER_SIZE; 34 35 private static final Logger log = Logger.getLogger(ClientNotificationProxy.class); 36 37 public ClientNotificationProxy() 38 { 39 String maxVal = System.getProperty(MAX_NOTIFICATION_BUFFER_SIZE_KEY); 40 if(maxVal != null && maxVal.length() > 0) 41 { 42 try 43 { 44 maxNumberOfNotifications = Integer.parseInt(maxVal); 45 } 46 catch(NumberFormatException e) 47 { 48 log.error("Could not convert max notification buffer size property value " + maxVal + " to a number. " + 49 "Will use default value of " + DEFAULT_MAX_NOTIFICATION_BUFFER_SIZE); 50 } 51 } 52 } 53 54 public Integer createListenerId(ObjectName name, NotificationFilter filter) 55 { 56 Integer id = new Integer (idCounter.increment()); 57 listenerHolders.put(id, new ClientListenerHolder(name, null, filter, id)); 58 return id; 59 } 60 61 public void handleNotification(Notification notification, Object o) 62 { 63 Integer id = (Integer ) o; 64 addClientNotification(id, notification); 65 } 66 67 private void addClientNotification(Integer id, Notification notification) 68 { 69 synchronized(clientListenerNotifications) 70 { 71 if(clientListenerNotifications.size() == maxNumberOfNotifications) 72 { 73 clientListenerNotifications.remove(0); 74 startSequence++; 75 } 76 TargetedNotification targetedNotification = new TargetedNotification (notification, id); 77 clientListenerNotifications.add(targetedNotification); 78 currentSequence++; 79 } 80 } 81 82 public NotificationFilter removeListener(Integer id) 83 { 84 ClientListenerHolder holder = (ClientListenerHolder) listenerHolders.remove(id); 85 return holder.getFilter(); 86 } 87 88 public NotificationResult fetchNotifications(long clientSequenceNumber, int maxNotifications, long timeout) 89 { 90 if(clientSequenceNumber < 0) 91 { 92 } 94 95 NotificationResult result = null; 96 boolean waitForTimeout = true; 97 boolean timeoutReached = false; 98 99 int startIndex = 0; 100 101 while(waitForTimeout) 102 { 103 synchronized(clientListenerNotifications) 104 { 105 waitForTimeout = false; 106 107 if(clientSequenceNumber > startSequence) 110 { 111 startIndex = (int) (clientSequenceNumber - startSequence); 112 113 if(startIndex > clientListenerNotifications.size()) 114 { 115 if(timeout > 0 && !timeoutReached) 116 { 117 try 119 { 120 clientListenerNotifications.wait(timeout); 121 waitForTimeout = true; 122 timeoutReached = true; 123 } 124 catch(InterruptedException e) 125 { 126 log.debug("Caught InterruptedException waiting for clientListenerNotifications."); 127 } 128 } 129 else 130 { 131 startIndex = clientListenerNotifications.size(); 132 } 133 } 134 } 135 136 int endIndex = maxNotifications > (clientListenerNotifications.size() - startIndex) ? clientListenerNotifications.size() : maxNotifications; 137 138 if(endIndex == startIndex) 140 { 141 if(timeout > 0 && !timeoutReached) 142 { 143 try 145 { 146 clientListenerNotifications.wait(timeout); 147 waitForTimeout = true; 148 timeoutReached = true; 149 } 150 catch(InterruptedException e) 151 { 152 log.debug("Caught InterruptedException waiting for clientListenerNotifications."); 153 } 154 } 155 } 156 157 List fetchedNotifications = clientListenerNotifications.subList(startIndex, endIndex); 158 TargetedNotification [] targetedNotifications = (TargetedNotification []) fetchedNotifications.toArray(new TargetedNotification [fetchedNotifications.size()]); 159 160 result = new NotificationResult (clientSequenceNumber, currentSequence, targetedNotifications); 161 } 162 } 163 164 return result; 165 } 166 167 public ClientListenerHolder[] getListeners() 168 { 169 Collection holders = listenerHolders.values(); 170 ClientListenerHolder[] holderArray = (ClientListenerHolder[])holders.toArray(new ClientListenerHolder[holders.size()]); 171 return holderArray; 172 } 173 174 } 175 | Popular Tags |