1 19 20 package org.apache.cayenne.access; 21 22 import java.io.IOException ; 23 import java.io.ObjectInputStream ; 24 import java.io.Serializable ; 25 import java.util.Collection ; 26 import java.util.Collections ; 27 import java.util.HashMap ; 28 import java.util.Iterator ; 29 import java.util.List ; 30 import java.util.Map ; 31 32 import org.apache.cayenne.CayenneRuntimeException; 33 import org.apache.cayenne.DataObject; 34 import org.apache.cayenne.DataRow; 35 import org.apache.cayenne.ObjectId; 36 import org.apache.cayenne.PersistenceState; 37 import org.apache.cayenne.Persistent; 38 import org.apache.cayenne.access.event.SnapshotEvent; 39 import org.apache.cayenne.event.EventBridge; 40 import org.apache.cayenne.event.EventBridgeFactory; 41 import org.apache.cayenne.event.EventManager; 42 import org.apache.cayenne.event.EventSubject; 43 import org.apache.commons.collections.ExtendedProperties; 44 import org.apache.commons.collections.map.LRUMap; 45 import org.apache.commons.logging.Log; 46 import org.apache.commons.logging.LogFactory; 47 48 58 public class DataRowStore implements Serializable { 59 60 private static Log logger = LogFactory.getLog(DataRowStore.class); 61 62 public static final String SNAPSHOT_EXPIRATION_PROPERTY = "cayenne.DataRowStore.snapshot.expiration"; 64 public static final String SNAPSHOT_CACHE_SIZE_PROPERTY = "cayenne.DataRowStore.snapshot.size"; 65 public static final String REMOTE_NOTIFICATION_PROPERTY = "cayenne.DataRowStore.remote.notify"; 66 public static final String EVENT_BRIDGE_FACTORY_PROPERTY = "cayenne.DataRowStore.EventBridge.factory"; 67 68 70 public static final long SNAPSHOT_EXPIRATION_DEFAULT = 2 * 60 * 60; 72 public static final int SNAPSHOT_CACHE_SIZE_DEFAULT = 10000; 73 public static final boolean REMOTE_NOTIFICATION_DEFAULT = false; 74 75 public static final String EVENT_BRIDGE_FACTORY_DEFAULT = "org.apache.cayenne.event.JavaGroupsBridgeFactory"; 78 79 protected String name; 80 protected LRUMap snapshots; 81 protected boolean notifyingRemoteListeners; 82 83 protected transient EventManager eventManager; 84 protected transient EventBridge remoteNotificationsHandler; 85 86 protected transient EventSubject eventSubject; 89 90 96 public DataRowStore(String name) { 97 this(name, Collections.EMPTY_MAP); 98 } 99 100 111 public DataRowStore(String name, Map properties) { 112 this(name, properties, new EventManager()); 113 } 114 115 127 public DataRowStore(String name, Map properties, EventManager eventManager) { 128 if (name == null) { 129 throw new IllegalArgumentException ("DataRowStore name can't be null."); 130 } 131 132 this.name = name; 133 this.eventSubject = createSubject(); 134 this.eventManager = eventManager; 135 initWithProperties(properties); 136 } 137 138 private EventSubject createSubject() { 139 return EventSubject.getSubject(this.getClass(), name); 140 } 141 142 protected void initWithProperties(Map properties) { 143 ExtendedProperties propertiesWrapper = new ExtendedProperties(); 144 145 if (properties != null) { 146 propertiesWrapper.putAll(properties); 147 } 148 149 long snapshotsExpiration = propertiesWrapper.getLong( 150 SNAPSHOT_EXPIRATION_PROPERTY, 151 SNAPSHOT_EXPIRATION_DEFAULT); 152 153 int snapshotsCacheSize = propertiesWrapper.getInt( 154 SNAPSHOT_CACHE_SIZE_PROPERTY, 155 SNAPSHOT_CACHE_SIZE_DEFAULT); 156 157 boolean notifyRemote = propertiesWrapper.getBoolean( 158 REMOTE_NOTIFICATION_PROPERTY, 159 REMOTE_NOTIFICATION_DEFAULT); 160 161 String eventBridgeFactory = propertiesWrapper.getString( 162 EVENT_BRIDGE_FACTORY_PROPERTY, 163 EVENT_BRIDGE_FACTORY_DEFAULT); 164 165 if (logger.isDebugEnabled()) { 166 logger.debug("DataRowStore property " 167 + SNAPSHOT_EXPIRATION_PROPERTY 168 + " = " 169 + snapshotsExpiration); 170 logger.debug("DataRowStore property " 171 + SNAPSHOT_CACHE_SIZE_PROPERTY 172 + " = " 173 + snapshotsCacheSize); 174 logger.debug("DataRowStore property " 175 + REMOTE_NOTIFICATION_PROPERTY 176 + " = " 177 + notifyRemote); 178 logger.debug("DataRowStore property " 179 + EVENT_BRIDGE_FACTORY_PROPERTY 180 + " = " 181 + eventBridgeFactory); 182 } 183 184 this.notifyingRemoteListeners = notifyRemote; 186 187 this.snapshots = new LRUMap(snapshotsCacheSize); 189 190 if (notifyingRemoteListeners) { 192 try { 193 EventBridgeFactory factory = (EventBridgeFactory) Class.forName( 194 eventBridgeFactory).newInstance(); 195 196 Collection subjects = Collections.singleton(getSnapshotEventSubject()); 197 String externalSubject = EventBridge 198 .convertToExternalSubject(getSnapshotEventSubject()); 199 this.remoteNotificationsHandler = factory.createEventBridge( 200 subjects, 201 externalSubject, 202 properties); 203 } 204 catch (Exception ex) { 205 throw new CayenneRuntimeException("Error initializing DataRowStore.", ex); 206 } 207 208 startListeners(); 209 } 210 } 211 212 217 void snapshotsUpdatedForObjects(List objects, List snapshots, boolean refresh) { 218 219 int size = objects.size(); 220 221 if (size != snapshots.size()) { 223 throw new IllegalArgumentException ( 224 "Counts of objects and corresponding snapshots do not match. " 225 + "Objects count: " 226 + objects.size() 227 + ", snapshots count: " 228 + snapshots.size()); 229 } 230 231 Map modified = null; 232 Object eventPostedBy = null; 233 234 synchronized (this) { 235 for (int i = 0; i < size; i++) { 236 Persistent object = (Persistent) objects.get(i); 237 238 if (object.getPersistenceState() == PersistenceState.HOLLOW) { 240 continue; 241 } 242 243 ObjectId oid = object.getObjectId(); 244 245 248 DataRow cachedSnapshot = (DataRow) this.snapshots.get(oid); 249 if (refresh || cachedSnapshot == null) { 250 251 DataRow newSnapshot = (DataRow) snapshots.get(i); 252 253 if (cachedSnapshot != null) { 254 if (object instanceof DataObject 256 && cachedSnapshot.equals(newSnapshot)) { 257 ((DataObject) object).setSnapshotVersion(cachedSnapshot 258 .getVersion()); 259 continue; 260 } 261 else { 262 newSnapshot.setReplacesVersion(cachedSnapshot.getVersion()); 263 } 264 } 265 266 if (modified == null) { 267 modified = new HashMap (); 268 eventPostedBy = object.getObjectContext().getGraphManager(); 269 } 270 271 modified.put(oid, newSnapshot); 272 } 273 } 274 275 if (modified != null) { 276 processSnapshotChanges( 277 eventPostedBy, 278 modified, 279 Collections.EMPTY_LIST, 280 Collections.EMPTY_LIST, 281 Collections.EMPTY_LIST); 282 } 283 } 284 } 285 286 289 public int size() { 290 return snapshots.size(); 291 } 292 293 296 public int maximumSize() { 297 return snapshots.maxSize(); 298 } 299 300 303 public void shutdown() { 304 stopListeners(); 305 clear(); 306 } 307 308 312 public String getName() { 313 return name; 314 } 315 316 320 public void setName(String name) { 321 this.name = name; 322 } 323 324 329 public EventManager getEventManager() { 330 return eventManager; 331 } 332 333 338 public void setEventManager(EventManager eventManager) { 339 if (eventManager != this.eventManager) { 340 stopListeners(); 341 this.eventManager = eventManager; 342 startListeners(); 343 } 344 } 345 346 350 public synchronized DataRow getCachedSnapshot(ObjectId oid) { 351 return (DataRow) snapshots.get(oid); 352 } 353 354 357 public EventSubject getSnapshotEventSubject() { 358 return eventSubject; 359 } 360 361 364 public synchronized void clear() { 365 snapshots.clear(); 366 } 367 368 371 public synchronized void forgetSnapshot(ObjectId id) { 372 snapshots.remove(id); 373 } 374 375 379 public void processRemoteEvent(SnapshotEvent event) { 380 if (event.getSource() != remoteNotificationsHandler) { 381 return; 382 } 383 384 if (logger.isDebugEnabled()) { 385 logger.debug("remote event: " + event); 386 } 387 388 Collection deletedSnapshotIds = event.getDeletedIds(); 389 Collection invalidatedSnapshotIds = event.getInvalidatedIds(); 390 Map diffs = event.getModifiedDiffs(); 391 Collection indirectlyModifiedIds = event.getIndirectlyModifiedIds(); 392 393 if (deletedSnapshotIds.isEmpty() 394 && invalidatedSnapshotIds.isEmpty() 395 && diffs.isEmpty() 396 && indirectlyModifiedIds.isEmpty()) { 397 logger.warn("processRemoteEvent.. bogus call... no changes."); 398 return; 399 } 400 401 synchronized (this) { 402 processDeletedIDs(deletedSnapshotIds); 403 processInvalidatedIDs(deletedSnapshotIds); 404 processUpdateDiffs(diffs); 405 sendUpdateNotification( 406 event.getPostedBy(), 407 diffs, 408 deletedSnapshotIds, 409 invalidatedSnapshotIds, 410 indirectlyModifiedIds); 411 } 412 } 413 414 418 public void processSnapshotChanges( 419 Object postedBy, 420 Map updatedSnapshots, 421 Collection deletedSnapshotIds, 422 Collection invalidatedSnapshotIds, 423 Collection indirectlyModifiedIds) { 424 425 427 if (deletedSnapshotIds.isEmpty() 428 && invalidatedSnapshotIds.isEmpty() 429 && updatedSnapshots.isEmpty() 430 && indirectlyModifiedIds.isEmpty()) { 431 logger.warn("postSnapshotsChangeEvent.. bogus call... no changes."); 432 return; 433 } 434 435 synchronized (this) { 436 processDeletedIDs(deletedSnapshotIds); 437 processInvalidatedIDs(invalidatedSnapshotIds); 438 Map diffs = processUpdatedSnapshots(updatedSnapshots); 439 sendUpdateNotification( 440 postedBy, 441 diffs, 442 deletedSnapshotIds, 443 invalidatedSnapshotIds, 444 indirectlyModifiedIds); 445 } 446 } 447 448 private void processDeletedIDs(Collection deletedSnapshotIDs) { 449 if (!deletedSnapshotIDs.isEmpty()) { 451 Iterator it = deletedSnapshotIDs.iterator(); 452 while (it.hasNext()) { 453 snapshots.remove(it.next()); 454 } 455 } 456 } 457 458 private void processInvalidatedIDs(Collection invalidatedSnapshotIds) { 459 if (!invalidatedSnapshotIds.isEmpty()) { 461 Iterator it = invalidatedSnapshotIds.iterator(); 462 while (it.hasNext()) { 463 snapshots.remove(it.next()); 464 } 465 } 466 } 467 468 private Map processUpdatedSnapshots(Map updatedSnapshots) { 469 Map diffs = null; 470 471 if (!updatedSnapshots.isEmpty()) { 473 Iterator it = updatedSnapshots.entrySet().iterator(); 474 while (it.hasNext()) { 475 Map.Entry entry = (Map.Entry ) it.next(); 476 477 ObjectId key = (ObjectId) entry.getKey(); 478 DataRow newSnapshot = (DataRow) entry.getValue(); 479 DataRow oldSnapshot = (DataRow) snapshots.put(key, newSnapshot); 480 481 484 486 499 if (oldSnapshot != null) { 500 if (oldSnapshot.getVersion() != newSnapshot.getReplacesVersion()) { 503 logger 504 .debug("snapshot version changed, don't know what to do... Old: " 505 + oldSnapshot 506 + ", New: " 507 + newSnapshot); 508 forgetSnapshot(key); 509 continue; 510 } 511 512 Map diff = oldSnapshot.createDiff(newSnapshot); 513 514 if (diff != null) { 515 if (diffs == null) { 516 diffs = new HashMap (); 517 } 518 519 diffs.put(key, diff); 520 } 521 } 522 } 523 } 524 525 return diffs; 526 } 527 528 private void processUpdateDiffs(Map diffs) { 529 if (!diffs.isEmpty()) { 531 Iterator it = diffs.entrySet().iterator(); 532 while (it.hasNext()) { 533 Map.Entry entry = (Map.Entry ) it.next(); 534 ObjectId key = (ObjectId) entry.getKey(); 535 DataRow oldSnapshot = (DataRow) snapshots.remove(key); 536 537 if (oldSnapshot == null) { 538 continue; 539 } 540 541 DataRow newSnapshot = oldSnapshot.applyDiff((DataRow) entry.getValue()); 542 snapshots.put(key, newSnapshot); 543 } 544 } 545 } 546 547 private void sendUpdateNotification( 548 Object postedBy, 549 Map diffs, 550 Collection deletedSnapshotIDs, 551 Collection invalidatedSnapshotIDs, 552 Collection indirectlyModifiedIds) { 553 554 if ((diffs != null && !diffs.isEmpty()) 556 || (deletedSnapshotIDs != null && !deletedSnapshotIDs.isEmpty()) 557 || (invalidatedSnapshotIDs != null && !invalidatedSnapshotIDs.isEmpty()) 558 || (indirectlyModifiedIds != null && !indirectlyModifiedIds.isEmpty())) { 559 560 SnapshotEvent event = new SnapshotEvent( 561 this, 562 postedBy, 563 diffs, 564 deletedSnapshotIDs, 565 invalidatedSnapshotIDs, 566 indirectlyModifiedIds); 567 568 if (logger.isDebugEnabled()) { 569 logger.debug("postSnapshotsChangeEvent: " + event); 570 } 571 572 eventManager.postEvent(event, getSnapshotEventSubject()); 575 } 576 } 577 578 public boolean isNotifyingRemoteListeners() { 579 return notifyingRemoteListeners; 580 } 581 582 public void setNotifyingRemoteListeners(boolean notifyingRemoteListeners) { 583 this.notifyingRemoteListeners = notifyingRemoteListeners; 584 } 585 586 private void readObject(ObjectInputStream in) throws IOException , 588 ClassNotFoundException { 589 590 in.defaultReadObject(); 591 592 this.eventSubject = createSubject(); 594 } 595 596 void stopListeners() { 597 eventManager.removeListener(this); 598 if (remoteNotificationsHandler != null) { 599 try { 600 remoteNotificationsHandler.shutdown(); 601 } 602 catch (Exception ex) { 603 logger.info("Exception shutting down EventBridge.", ex); 604 } 605 remoteNotificationsHandler = null; 606 } 607 } 608 609 void startListeners() { 610 if (remoteNotificationsHandler != null) { 611 try { 612 618 eventManager.addNonBlockingListener( 619 this, 620 "processRemoteEvent", 621 SnapshotEvent.class, 622 getSnapshotEventSubject(), 623 remoteNotificationsHandler); 624 625 remoteNotificationsHandler.startup( 628 eventManager, 629 EventBridge.RECEIVE_LOCAL_EXTERNAL); 630 } 631 catch (Exception ex) { 632 throw new CayenneRuntimeException("Error initializing DataRowStore.", ex); 633 } 634 } 635 } 636 } 637 | Popular Tags |