1 8 9 package mx4j.remote; 10 11 import java.io.IOException ; 12 import java.util.ArrayList ; 13 import java.util.Arrays ; 14 import java.util.HashMap ; 15 import java.util.Iterator ; 16 import java.util.LinkedList ; 17 import java.util.List ; 18 import java.util.Map ; 19 import javax.management.Notification ; 20 import javax.management.NotificationFilter ; 21 import javax.management.NotificationListener ; 22 import javax.management.remote.NotificationResult ; 23 import javax.management.remote.TargetedNotification ; 24 25 import mx4j.log.Log; 26 import mx4j.log.Logger; 27 28 33 public abstract class AbstractRemoteNotificationClientHandler implements RemoteNotificationClientHandler 34 { 35 private static int fetcherID; 36 private static int delivererID; 37 38 private final ConnectionNotificationEmitter emitter; 39 private final HeartBeat heartbeat; 40 private final Map tuples = new HashMap (); 41 private NotificationFetcherThread fetcherThread; 42 private NotificationDelivererThread delivererThread; 43 44 59 protected AbstractRemoteNotificationClientHandler(ConnectionNotificationEmitter emitter, HeartBeat heartbeat, Map environment) 60 { 61 this.emitter = emitter; 62 this.heartbeat = heartbeat; 63 this.fetcherThread = new NotificationFetcherThread(environment); 64 this.delivererThread = new NotificationDelivererThread(environment); 65 } 66 67 73 public boolean isActive() 74 { 75 return fetcherThread.isActive(); 76 } 77 78 public void start() 79 { 80 if (isActive()) return; 81 delivererThread.start(); 82 fetcherThread.start(); 83 } 84 85 public void stop() 86 { 87 if (!isActive()) return; 88 fetcherThread.stop(); 89 delivererThread.stop(); 90 synchronized (tuples) 91 { 92 tuples.clear(); 93 } 94 } 95 96 private synchronized static int getFetcherID() 97 { 98 return ++fetcherID; 99 } 100 101 private synchronized static int getDelivererID() 102 { 103 return ++delivererID; 104 } 105 106 public boolean contains(NotificationTuple tuple) 107 { 108 synchronized (tuples) 109 { 110 return tuples.containsValue(tuple); 111 } 112 } 113 114 public void addNotificationListener(Integer id, NotificationTuple tuple) 115 { 116 if (!isActive()) start(); 117 118 synchronized (tuples) 119 { 120 tuples.put(id, tuple); 121 } 122 123 Logger logger = getLogger(); 124 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Adding remote NotificationListener " + tuple); 125 } 126 127 public Integer [] getNotificationListeners(NotificationTuple tuple) 128 { 129 synchronized (tuples) 130 { 131 ArrayList ids = new ArrayList (); 132 for (Iterator i = tuples.entrySet().iterator(); i.hasNext();) 133 { 134 Map.Entry entry = (Map.Entry )i.next(); 135 if (entry.getValue().equals(tuple)) ids.add(entry.getKey()); 136 } 137 if (ids.size() > 0) return (Integer [])ids.toArray(new Integer [ids.size()]); 138 } 139 return null; 140 } 141 142 public Integer getNotificationListener(NotificationTuple tuple) 143 { 144 synchronized (tuples) 145 { 146 for (Iterator i = tuples.entrySet().iterator(); i.hasNext();) 147 { 148 Map.Entry entry = (Map.Entry )i.next(); 149 if (entry.getValue().equals(tuple)) return (Integer )entry.getKey(); 150 } 151 } 152 return null; 153 } 154 155 public void removeNotificationListeners(Integer [] ids) 156 { 157 Logger logger = getLogger(); 158 synchronized (tuples) 159 { 160 for (int i = 0; i < ids.length; ++i) 161 { 162 Integer id = ids[i]; 163 NotificationTuple tuple = (NotificationTuple)tuples.remove(id); 164 if (tuple != null && logger.isEnabledFor(Logger.DEBUG)) logger.debug("Removing remote NotificationListener " + tuple); 165 } 166 } 167 } 168 169 172 protected abstract NotificationResult fetchNotifications(long sequence, int maxNumber, long timeout) throws IOException ; 173 174 181 protected long getRetryPeriod() 182 { 183 return heartbeat.getPulsePeriod(); 184 } 185 186 194 protected int getMaxRetries() 195 { 196 return heartbeat.getMaxRetries(); 197 } 198 199 203 protected void sendConnectionNotificationLost(long number) 204 { 205 emitter.sendConnectionNotificationLost(number); 206 } 207 208 protected int getNotificationsCount() 209 { 210 return delivererThread.getNotificationsCount(); 211 } 212 213 private int deliverNotifications(TargetedNotification [] notifications) 214 { 215 return delivererThread.addNotifications(notifications); 216 } 217 218 private void sendNotification(TargetedNotification notification) 219 { 220 NotificationTuple tuple = null; 221 synchronized (tuples) 222 { 223 tuple = (NotificationTuple)tuples.get(notification.getListenerID()); 224 } 225 226 if (tuple == null) return; 228 229 Notification notif = notification.getNotification(); 230 231 Logger logger = getLogger(); 232 233 if (tuple.getInvokeFilter()) 234 { 235 NotificationFilter filter = tuple.getNotificationFilter(); 237 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Filtering notification " + notif + ", filter = " + filter); 238 if (filter != null) 239 { 240 try 241 { 242 boolean deliver = filter.isNotificationEnabled(notif); 243 if (!deliver) return; 244 } 245 catch (Throwable x) 246 { 247 logger.warn("Throwable caught from isNotificationEnabled, filter = " + filter, x); 248 } 250 } 251 } 252 253 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Sending Notification " + notif + ", listener info is " + tuple); 254 255 NotificationListener listener = tuple.getNotificationListener(); 256 257 try 258 { 259 listener.handleNotification(notif, tuple.getHandback()); 260 } 261 catch (Throwable x) 262 { 263 logger.warn("Throwable caught from handleNotification, listener = " + listener, x); 264 } 266 } 267 268 protected Logger getLogger() 269 { 270 return Log.getLogger(getClass().getName()); 271 } 272 273 private class NotificationFetcherThread implements Runnable 274 { 275 private long sequenceNumber; 276 private volatile boolean active; 277 private Thread thread; 278 private long timeout; 279 private int maxNumber; 280 private long sleep; 281 282 private NotificationFetcherThread(Map environment) 283 { 284 timeout = 60 * 1000; 286 maxNumber = 25; 288 sleep = 0; 290 if (environment != null) 291 { 292 try 293 { 294 timeout = ((Long )environment.get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_TIMEOUT)).longValue(); 295 } 296 catch (Exception ignored) 297 { 298 } 299 try 300 { 301 maxNumber = ((Integer )environment.get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_MAX_NUMBER)).intValue(); 302 } 303 catch (Exception ignored) 304 { 305 } 306 try 307 { 308 sleep = ((Integer )environment.get(MX4JRemoteConstants.FETCH_NOTIFICATIONS_SLEEP)).intValue(); 309 } 310 catch (Exception ignored) 311 { 312 } 313 } 314 } 315 316 private synchronized long getSequenceNumber() 317 { 318 return sequenceNumber; 319 } 320 321 private synchronized void setSequenceNumber(long sequenceNumber) 322 { 323 this.sequenceNumber = sequenceNumber; 324 } 325 326 private boolean isActive() 327 { 328 return active; 329 } 330 331 private synchronized void start() 332 { 333 active = true; 334 sequenceNumber = -1; 336 thread = new Thread (this, "Notification Fetcher #" + getFetcherID()); 337 thread.setDaemon(true); 338 thread.start(); 339 } 340 341 private synchronized void stop() 342 { 343 active = false; 344 thread.interrupt(); 345 } 346 347 public void run() 348 { 349 Logger logger = getLogger(); 350 try 351 { 352 while (isActive() && !thread.isInterrupted()) 353 { 354 try 355 { 356 long sequence = getSequenceNumber(); 357 NotificationResult result = fetchNotifications(sequence, maxNumber, timeout); 358 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Fetched Notifications: " + result); 359 360 long sleepTime = sleep; 361 if (result != null) 362 { 363 long nextSequence = result.getNextSequenceNumber(); 364 TargetedNotification [] targeted = result.getTargetedNotifications(); 365 int targetedLength = targeted == null ? 0 : targeted.length; 366 boolean notifsFilteredByServer = sequence >= 0 ? nextSequence - sequence != targetedLength : false; 367 boolean notifsLostByServer = sequence >= 0 && result.getEarliestSequenceNumber() > sequence; 368 if (notifsFilteredByServer) 369 { 370 sendConnectionNotificationLost(nextSequence - sequence - targetedLength); 372 } 373 if (notifsLostByServer) 374 { 375 sendConnectionNotificationLost(result.getEarliestSequenceNumber() - sequence); 377 } 378 379 setSequenceNumber(nextSequence); 380 int delivered = deliverNotifications(targeted); 381 if (delivered < targetedLength) 382 { 383 sendConnectionNotificationLost(targetedLength - delivered); 385 } 386 387 if (targeted != null && targeted.length == maxNumber) sleepTime = 0; 389 } 390 391 if (sleepTime > 0) Thread.sleep(sleepTime); 392 } 393 catch (IOException x) 394 { 395 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Caught IOException from fetchNotifications", x); 396 break; 397 } 398 catch (InterruptedException x) 399 { 400 Thread.currentThread().interrupt(); 401 break; 402 } 403 catch (Throwable x) 404 { 405 if (logger.isEnabledFor(Logger.WARN)) logger.warn("Caught an unexpected exception", x); 406 } 407 } 408 } 409 finally 410 { 411 AbstractRemoteNotificationClientHandler.this.stop(); 412 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug(thread.getName() + " Thread exited"); 413 } 414 } 415 416 423 private NotificationResult fetchNotifications(long sequence, int maxNumber, long timeout) throws IOException , InterruptedException 424 { 425 Logger logger = getLogger(); 426 int retries = 0; 427 while (true) 428 { 429 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Fetching notifications, sequence is " + sequence + ", timeout is " + timeout); 430 try 431 { 432 return AbstractRemoteNotificationClientHandler.this.fetchNotifications(sequence, maxNumber, timeout); 433 } 434 catch (IOException x) 435 { 436 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Could not fetch notifications, sleeping " + getRetryPeriod() + " and trying " + (getMaxRetries() - retries) + " more times", x); 437 Thread.sleep(getRetryPeriod()); 438 if (retries++ == getMaxRetries()) throw x; 439 } 440 } 441 } 442 } 443 444 private class NotificationDelivererThread implements Runnable 445 { 446 private final List notificationQueue = new LinkedList (); 447 private int capacity; 448 private volatile boolean active; 449 private Thread thread; 450 451 private NotificationDelivererThread(Map environment) 452 { 453 if (environment != null) 454 { 455 Object size = environment.get(MX4JRemoteConstants.NOTIFICATION_QUEUE_CAPACITY); 456 if (size instanceof Integer ) 457 { 458 capacity = ((Integer )size).intValue(); 459 if (capacity < 0) capacity = 0; 460 } 461 } 462 } 463 464 private int addNotifications(TargetedNotification [] notifications) 465 { 466 if (notifications == null || notifications.length == 0) return 0; 467 468 List notifs = Arrays.asList(notifications); 469 470 Logger logger = getLogger(); 471 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Enqueuing notifications for delivery: " + notifs); 472 473 synchronized (this) 474 { 475 int size = notifs.size(); 476 int added = size; 477 if (capacity > 0) 478 { 479 int room = capacity - notificationQueue.size(); 480 if (room < size) 481 { 482 added = room; 483 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug("Notification queue is full, enqueued " + room + " notifications out of " + size + ", exceeding will be lost"); 484 } 485 notificationQueue.addAll(notifs.subList(0, added)); 486 } 487 else 488 { 489 notificationQueue.addAll(notifs); 490 } 491 notifyAll(); 492 return added; 493 } 494 } 495 496 private boolean isActive() 497 { 498 return active; 499 } 500 501 private synchronized void start() 502 { 503 active = true; 504 notificationQueue.clear(); 505 thread = new Thread (this, "Notification Deliverer #" + getDelivererID()); 506 thread.setDaemon(true); 507 thread.start(); 508 } 509 510 private synchronized void stop() 511 { 512 active = false; 513 thread.interrupt(); 514 } 515 516 public void run() 517 { 518 Logger logger = getLogger(); 519 try 520 { 521 while (isActive() && !thread.isInterrupted()) 522 { 523 try 524 { 525 TargetedNotification notification = null; 526 synchronized (this) 527 { 528 while (notificationQueue.isEmpty()) wait(); 529 notification = (TargetedNotification )notificationQueue.remove(0); 530 } 531 sendNotification(notification); 532 } 533 catch (InterruptedException x) 534 { 535 Thread.currentThread().interrupt(); 536 break; 537 } 538 catch (Throwable x) 539 { 540 if (logger.isEnabledFor(Logger.WARN)) logger.warn("Caught an unexpected exception", x); 541 } 542 } 543 } 544 finally 545 { 546 active = false; 547 if (logger.isEnabledFor(Logger.DEBUG)) logger.debug(thread.getName() + " Thread exited"); 548 } 549 } 550 551 private int getNotificationsCount() 552 { 553 synchronized (this) 554 { 555 return notificationQueue.size(); 556 } 557 } 558 } 559 } 560 | Popular Tags |