1 8 9 package mx4j.remote; 10 11 import java.io.IOException ; 12 import java.util.Arrays ; 13 import java.util.HashMap ; 14 import java.util.LinkedList ; 15 import java.util.List ; 16 import java.util.Map ; 17 18 import javax.management.Notification ; 19 import javax.management.NotificationFilter ; 20 import javax.management.NotificationListener ; 21 import javax.management.ObjectName ; 22 import javax.management.remote.NotificationResult ; 23 import javax.management.remote.TargetedNotification ; 24 25 import mx4j.log.Log; 26 import mx4j.log.Logger; 27 28 33 public class DefaultRemoteNotificationServerHandler implements RemoteNotificationServerHandler 34 { 35 private static int listenerID; 36 37 private final NotificationListener listener; 38 private final Map tuples = new HashMap (); 39 private final NotificationBuffer buffer; 40 private volatile boolean closed; 41 42 49 public DefaultRemoteNotificationServerHandler(Map environment) 50 { 51 listener = new ServerListener(); 52 buffer = new NotificationBuffer(environment); 53 } 54 55 public Integer generateListenerID(ObjectName name, NotificationFilter filter) 56 { 57 synchronized (DefaultRemoteNotificationServerHandler.class) 58 { 59 return new Integer (++listenerID); 60 } 61 } 62 63 public NotificationListener getServerNotificationListener() 64 { 65 return listener; 66 } 67 68 public void addNotificationListener(Integer id, NotificationTuple tuple) 69 { 70 if (closed) return; 71 synchronized (tuples) 72 { 73 tuples.put(id, tuple); 74 } 75 } 76 77 public NotificationTuple removeNotificationListener(Integer id) 78 { 79 if (closed) return null; 80 synchronized (tuples) 81 { 82 return (NotificationTuple)tuples.remove(id); 83 } 84 } 85 86 public NotificationResult fetchNotifications(long sequenceNumber, int maxNotifications, long timeout) throws IOException 87 { 88 if (closed) throw new IOException ("RemoteNotificationServerHandler is closed"); 89 return buffer.getNotifications(sequenceNumber, maxNotifications, timeout); 90 } 91 92 public NotificationTuple[] close() 93 { 94 Logger logger = getLogger(); 95 closed = true; 96 stopWaitingForNotifications(buffer); 97 synchronized (tuples) 98 { 99 NotificationTuple[] result = (NotificationTuple[])tuples.values().toArray(new NotificationTuple[tuples.size()]); 100 tuples.clear(); 101 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("RemoteNotificationServerHandler closed, returning: " + Arrays.asList(result)); 102 return result; 103 } 104 } 105 106 113 private void stopWaitingForNotifications(Object lock) 114 { 115 synchronized (lock) 116 { 117 lock.notifyAll(); 118 } 119 } 120 121 132 protected boolean waitForNotifications(Object lock, long timeout) 133 { 134 Logger logger = getLogger(); 135 long start = 0; 136 if (logger.isEnabledFor(Logger.DEBUG)) 137 { 138 logger.debug("Waiting for notifications " + timeout + " ms"); 139 start = System.currentTimeMillis(); 140 } 141 142 synchronized (lock) 143 { 144 try 145 { 146 lock.wait(timeout); 147 } 148 catch (InterruptedException x) 149 { 150 Thread.currentThread().interrupt(); 151 } 152 } 153 154 if (logger.isEnabledFor(Logger.DEBUG)) 155 { 156 long elapsed = System.currentTimeMillis() - start; 157 logger.debug("Waited for notifications " + elapsed + " ms"); 158 } 159 160 return true; 161 } 162 163 170 protected TargetedNotification [] filterNotifications(TargetedNotification [] notifications) 171 { 172 return notifications; 173 } 174 175 private void addNotification(Integer id, Notification notification) 176 { 177 buffer.add(new TargetedNotification (notification, id)); 178 } 179 180 protected Logger getLogger() 181 { 182 return Log.getLogger(getClass().getName()); 183 } 184 185 private class ServerListener implements NotificationListener 186 { 187 public void handleNotification(Notification notification, Object handback) 188 { 189 Integer id = (Integer )handback; 190 addNotification(id, notification); 191 } 192 } 193 194 private class NotificationBuffer 195 { 196 private final List notifications = new LinkedList (); 197 private int maxCapacity; 198 private int purgeDistance; 199 private long firstSequence; 200 private long lastSequence; 201 private long lowestExpectedSequence = -1; 202 203 private NotificationBuffer(Map environment) 204 { 205 if (environment != null) 206 { 207 try 208 { 209 Integer maxCapacityInteger = (Integer )environment.get(MX4JRemoteConstants.NOTIFICATION_BUFFER_CAPACITY); 210 if (maxCapacityInteger != null) maxCapacity = maxCapacityInteger.intValue(); 211 } 212 catch (Exception ignored) 213 { 214 } 215 216 try 217 { 218 Integer purgeDistanceInteger = (Integer )environment.get(MX4JRemoteConstants.NOTIFICATION_PURGE_DISTANCE); 219 if (purgeDistanceInteger != null) purgeDistance = purgeDistanceInteger.intValue(); 220 } 221 catch (Exception ignored) 222 { 223 } 224 } 225 if (maxCapacity <= 0) maxCapacity = 1024; 226 if (purgeDistance <= 0) purgeDistance = 128; 227 } 228 229 private int getSize() 230 { 231 synchronized (this) 232 { 233 return notifications.size(); 234 } 235 } 236 237 private void add(TargetedNotification notification) 238 { 239 Logger logger = getLogger(); 240 synchronized (this) 241 { 242 if (notifications.size() == maxCapacity) 243 { 244 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Notification buffer full: " + this); 245 removeRange(0, 1); 246 } 247 notifications.add(notification); 248 ++lastSequence; 249 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Notification added to buffer: " + this); 250 notifyAll(); 251 } 252 } 253 254 private void removeRange(int start, int end) 255 { 256 synchronized (this) 257 { 258 notifications.subList(start, end).clear(); 259 firstSequence += end - start; 260 } 261 } 262 263 private long getFirstSequenceNumber() 264 { 265 synchronized (this) 266 { 267 return firstSequence; 268 } 269 } 270 271 private long getLastSequenceNumber() 272 { 273 synchronized (this) 274 { 275 return lastSequence; 276 } 277 } 278 279 private NotificationResult getNotifications(long sequenceNumber, int maxNotifications, long timeout) 280 { 281 Logger logger = getLogger(); 282 synchronized (this) 283 { 284 NotificationResult result = null; 285 int size = 0; 286 if (sequenceNumber < 0) 287 { 288 long sequence = getLastSequenceNumber(); 290 size = new Long (sequence + 1).intValue(); 291 result = new NotificationResult (getFirstSequenceNumber(), sequence, new TargetedNotification [0]); 292 if (lowestExpectedSequence < 0) lowestExpectedSequence = sequence; 293 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("First fetchNotification call: " + this + ", returning " + result); 294 } 295 else 296 { 297 long firstSequence = getFirstSequenceNumber(); 298 299 int losts = 0; 300 int start = new Long (sequenceNumber - firstSequence).intValue(); 301 if (start < 0) 305 { 306 losts = -start; 307 start = 0; 308 } 309 310 List sublist = null; 311 boolean send = false; 312 while (size == 0) 313 { 314 int end = notifications.size(); 315 if (end - start > maxNotifications) end = start + maxNotifications; 316 317 sublist = notifications.subList(start, end); 318 size = sublist.size(); 319 320 if (closed || send) break; 321 322 if (size == 0) 323 { 324 if (timeout <= 0) break; 325 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("No notifications to send, waiting " + timeout + " ms"); 326 327 send = waitForNotifications(this, timeout); 331 } 332 } 333 334 TargetedNotification [] notifications = (TargetedNotification [])sublist.toArray(new TargetedNotification [size]); 335 notifications = filterNotifications(notifications); 336 result = new NotificationResult (firstSequence, sequenceNumber + losts + size, notifications); 337 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Non-first fetchNotification call: " + this + ", returning " + result); 338 339 int purged = purgeNotifications(sequenceNumber, size); 340 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Purged " + purged + " notifications: " + this); 341 } 342 return result; 343 } 344 } 345 346 private int purgeNotifications(long sequenceNumber, int size) 347 { 348 357 363 int result = 0; 364 synchronized (this) 365 { 366 if (sequenceNumber <= lowestExpectedSequence) 367 { 368 long lowest = Math.min(lowestExpectedSequence, sequenceNumber); 369 370 long firstSequence = getFirstSequenceNumber(); 371 if (lowest - firstSequence > purgeDistance) 372 { 373 int purgeSize = purgeDistance >> 1; 375 removeRange(0, purgeSize); 376 result = purgeSize; 377 } 378 379 long expected = Math.max(sequenceNumber + size, firstSequence); 380 lowestExpectedSequence = expected; 381 } 382 } 383 return result; 384 } 385 386 public String toString() 387 { 388 StringBuffer buffer = new StringBuffer ("NotificationBuffer@"); 389 buffer.append(Integer.toHexString(hashCode())).append("["); 390 buffer.append("first=").append(getFirstSequenceNumber()).append(", "); 391 buffer.append("last=").append(getLastSequenceNumber()).append(", "); 392 buffer.append("size=").append(getSize()).append(", "); 393 buffer.append("lowestExpected=").append(lowestExpectedSequence).append(", "); 394 buffer.append("maxCapacity=").append(maxCapacity).append(", "); 395 buffer.append("purgeDistance=").append(purgeDistance).append("]"); 396 return buffer.toString(); 397 } 398 } 399 } 400 | Popular Tags |