1 56 package org.objectstyle.cayenne.access; 57 58 import java.io.IOException ; 59 import java.io.ObjectInputStream ; 60 import java.io.Serializable ; 61 import java.util.Collection ; 62 import java.util.Collections ; 63 import java.util.HashMap ; 64 import java.util.Iterator ; 65 import java.util.List ; 66 import java.util.Map ; 67 68 import org.apache.commons.collections.ExtendedProperties; 69 import org.apache.commons.collections.map.LRUMap; 70 import org.apache.log4j.Logger; 71 import org.objectstyle.cayenne.CayenneRuntimeException; 72 import org.objectstyle.cayenne.DataRow; 73 import org.objectstyle.cayenne.ObjectId; 74 import org.objectstyle.cayenne.access.event.SnapshotEvent; 75 import org.objectstyle.cayenne.access.util.QueryUtils; 76 import org.objectstyle.cayenne.access.util.SelectObserver; 77 import org.objectstyle.cayenne.event.EventBridge; 78 import org.objectstyle.cayenne.event.EventBridgeFactory; 79 import org.objectstyle.cayenne.event.EventManager; 80 import org.objectstyle.cayenne.event.EventSubject; 81 import org.objectstyle.cayenne.query.SelectQuery; 82 83 93 public class DataRowStore implements Serializable { 94 95 private static Logger logObj = Logger.getLogger(DataRowStore.class); 96 97 public static final String SNAPSHOT_EXPIRATION_PROPERTY = "cayenne.DataRowStore.snapshot.expiration"; 99 public static final String SNAPSHOT_CACHE_SIZE_PROPERTY = "cayenne.DataRowStore.snapshot.size"; 100 public static final String REMOTE_NOTIFICATION_PROPERTY = "cayenne.DataRowStore.remote.notify"; 101 public static final String EVENT_BRIDGE_FACTORY_PROPERTY = "cayenne.DataRowStore.EventBridge.factory"; 102 103 105 public static final long SNAPSHOT_EXPIRATION_DEFAULT = 2 * 60 * 60; 107 public static final int SNAPSHOT_CACHE_SIZE_DEFAULT = 10000; 108 public static final boolean REMOTE_NOTIFICATION_DEFAULT = false; 109 110 public static final String EVENT_BRIDGE_FACTORY_DEFAULT = "org.objectstyle.cayenne.event.JavaGroupsBridgeFactory"; 113 114 protected String name; 115 protected LRUMap snapshots; 116 protected LRUMap snapshotLists; 117 protected boolean notifyingRemoteListeners; 118 119 protected transient EventBridge remoteNotificationsHandler; 120 121 protected transient EventSubject eventSubject; 124 125 128 public DataRowStore(String name) { 129 this(name, Collections.EMPTY_MAP); 130 } 131 132 141 public DataRowStore(String name, Map properties) { 142 if (name == null) { 143 throw new IllegalArgumentException ("DataRowStore name can't be null."); 144 } 145 146 this.name = name; 147 this.eventSubject = createSubject(); 148 initWithProperties(properties); 149 } 150 151 private EventSubject createSubject() { 152 return EventSubject.getSubject(this.getClass(), name); 153 } 154 155 protected void initWithProperties(Map properties) { 156 ExtendedProperties propertiesWrapper = new ExtendedProperties(); 157 158 if (properties != null) { 159 propertiesWrapper.putAll(properties); 160 } 161 162 long snapshotsExpiration = propertiesWrapper.getLong( 163 SNAPSHOT_EXPIRATION_PROPERTY, 164 SNAPSHOT_EXPIRATION_DEFAULT); 165 166 int snapshotsCacheSize = propertiesWrapper.getInt( 167 SNAPSHOT_CACHE_SIZE_PROPERTY, 168 SNAPSHOT_CACHE_SIZE_DEFAULT); 169 170 boolean notifyRemote = propertiesWrapper.getBoolean( 171 REMOTE_NOTIFICATION_PROPERTY, 172 REMOTE_NOTIFICATION_DEFAULT); 173 174 String eventBridgeFactory = propertiesWrapper.getString( 175 EVENT_BRIDGE_FACTORY_PROPERTY, 176 EVENT_BRIDGE_FACTORY_DEFAULT); 177 178 if (logObj.isDebugEnabled()) { 179 logObj.debug("DataRowStore property " 180 + SNAPSHOT_EXPIRATION_PROPERTY 181 + " = " 182 + snapshotsExpiration); 183 logObj.debug("DataRowStore property " 184 + SNAPSHOT_CACHE_SIZE_PROPERTY 185 + " = " 186 + snapshotsCacheSize); 187 logObj.debug("DataRowStore property " 188 + REMOTE_NOTIFICATION_PROPERTY 189 + " = " 190 + notifyRemote); 191 logObj.debug("DataRowStore property " 192 + EVENT_BRIDGE_FACTORY_PROPERTY 193 + " = " 194 + eventBridgeFactory); 195 } 196 197 this.notifyingRemoteListeners = notifyRemote; 199 200 this.snapshots = new LRUMap(snapshotsCacheSize); 202 203 this.snapshotLists = new LRUMap(snapshotsCacheSize); 206 207 if (notifyingRemoteListeners) { 209 try { 210 EventBridgeFactory factory = (EventBridgeFactory) Class 211 .forName(eventBridgeFactory) 212 .newInstance(); 213 this.remoteNotificationsHandler = factory 214 .createEventBridge(getSnapshotEventSubject(), properties); 215 216 222 EventManager.getDefaultManager().addNonBlockingListener(this, 223 "processRemoteEvent", 224 SnapshotEvent.class, 225 getSnapshotEventSubject(), 226 remoteNotificationsHandler); 227 228 remoteNotificationsHandler.startup(EventManager.getDefaultManager(), 231 EventBridge.RECEIVE_LOCAL_EXTERNAL); 232 } 233 catch (Exception ex) { 234 throw new CayenneRuntimeException("Error initializing DataRowStore.", ex); 235 } 236 } 237 } 238 239 242 public int size() { 243 return snapshots.size(); 244 } 245 246 249 public int maximumSize() { 250 return snapshots.maxSize(); 251 } 252 253 256 public void shutdown() { 257 if (remoteNotificationsHandler != null) { 258 try { 259 remoteNotificationsHandler.shutdown(); 260 } 261 catch (Exception ex) { 262 logObj.info("Exception shutting down EventBridge.", ex); 263 } 264 remoteNotificationsHandler = null; 265 } 266 267 clear(); 268 } 269 270 274 public String getName() { 275 return name; 276 } 277 278 public void setName(String name) { 279 this.name = name; 280 } 281 282 286 public synchronized DataRow getCachedSnapshot(ObjectId oid) { 287 return (DataRow) snapshots.get(oid); 288 } 289 290 295 public synchronized DataRow getSnapshot(ObjectId oid, QueryEngine engine) { 296 297 DataRow cachedSnapshot = getCachedSnapshot(oid); 299 if (cachedSnapshot != null) { 300 return cachedSnapshot; 301 } 302 303 if (logObj.isDebugEnabled()) { 304 logObj.debug("no cached snapshot for ObjectId: " + oid); 305 } 306 307 SelectQuery select = QueryUtils.selectObjectForId(oid); 309 SelectObserver observer = new SelectObserver(); 310 engine.performQueries(Collections.singletonList(select), observer); 311 List results = observer.getResults(select); 312 313 if (results.size() > 1) { 314 throw new CayenneRuntimeException("More than 1 object found for ObjectId " 315 + oid 316 + ". Fetch matched " 317 + results.size() 318 + " objects."); 319 } else if (results.size() == 0) { 320 return null; 321 } else { 322 DataRow snapshot = (DataRow) results.get(0); 323 snapshots.put(oid, snapshot); 324 return snapshot; 325 } 326 } 327 328 331 public void cacheSnapshots(String key, List snapshots) { 332 snapshotLists.put(key, snapshots); 333 } 334 335 338 public List getCachedSnapshots(String key) { 339 if (key == null) { 340 return null; 341 } 342 343 return (List ) snapshotLists.get(key); 344 } 345 346 349 public EventSubject getSnapshotEventSubject() { 350 return eventSubject; 351 } 352 353 356 public synchronized void clear() { 357 snapshots.clear(); 358 snapshotLists.clear(); 359 } 360 361 364 public synchronized void forgetSnapshot(ObjectId id) { 365 snapshots.remove(id); 366 } 367 368 372 public void processRemoteEvent(SnapshotEvent event) { 373 if (event.getPostedBy() == this 374 || event.getSource() != remoteNotificationsHandler) { 375 return; 376 } 377 378 if (logObj.isDebugEnabled()) { 379 logObj.debug("remote event: " + event); 380 } 381 382 Collection deletedSnapshotIds = event.getDeletedIds(); 383 Collection invalidatedSnapshotIds = event.getInvalidatedIds(); 384 Map diffs = event.getModifiedDiffs(); 385 Collection indirectlyModifiedIds = event.getIndirectlyModifiedIds(); 386 387 if (deletedSnapshotIds.isEmpty() 388 && invalidatedSnapshotIds.isEmpty() 389 && diffs.isEmpty() 390 && indirectlyModifiedIds.isEmpty()) { 391 logObj.warn("processRemoteEvent.. bogus call... no changes."); 392 return; 393 } 394 395 synchronized (this) { 396 processDeletedIDs(deletedSnapshotIds); 397 processInvalidatedIDs(deletedSnapshotIds); 398 processUpdateDiffs(diffs); 399 sendUpdateNotification( 400 event.getSource(), 401 diffs, 402 deletedSnapshotIds, 403 invalidatedSnapshotIds, 404 indirectlyModifiedIds); 405 } 406 } 407 408 413 public void processSnapshotChanges( 414 Object source, 415 Map updatedSnapshots, 416 Collection deletedSnapshotIds, 417 Collection indirectlyModifiedIds) { 418 419 this.processSnapshotChanges(source, updatedSnapshots, deletedSnapshotIds, Collections.EMPTY_LIST, indirectlyModifiedIds); 420 } 421 422 426 public void processSnapshotChanges( 427 Object source, 428 Map updatedSnapshots, 429 Collection deletedSnapshotIds, 430 Collection invalidatedSnapshotIds, 431 Collection indirectlyModifiedIds) { 432 433 435 if (deletedSnapshotIds.isEmpty() 436 && invalidatedSnapshotIds.isEmpty() 437 && updatedSnapshots.isEmpty() 438 && indirectlyModifiedIds.isEmpty()) { 439 logObj.warn("postSnapshotsChangeEvent.. bogus call... no changes."); 440 return; 441 } 442 443 synchronized (this) { 444 processDeletedIDs(deletedSnapshotIds); 445 processInvalidatedIDs(invalidatedSnapshotIds); 446 Map diffs = processUpdatedSnapshots(updatedSnapshots); 447 sendUpdateNotification( 448 source, 449 diffs, 450 deletedSnapshotIds, 451 invalidatedSnapshotIds, 452 indirectlyModifiedIds); 453 } 454 } 455 456 private void processDeletedIDs(Collection deletedSnapshotIDs) { 457 if (!deletedSnapshotIDs.isEmpty()) { 459 Iterator it = deletedSnapshotIDs.iterator(); 460 while (it.hasNext()) { 461 snapshots.remove(it.next()); 462 } 463 } 464 } 465 466 private void processInvalidatedIDs(Collection invalidatedSnapshotIds) { 467 if (!invalidatedSnapshotIds.isEmpty()) { 469 Iterator it = invalidatedSnapshotIds.iterator(); 470 while (it.hasNext()) { 471 snapshots.remove(it.next()); 472 } 473 } 474 } 475 476 private Map processUpdatedSnapshots(Map updatedSnapshots) { 477 Map diffs = null; 478 479 if (!updatedSnapshots.isEmpty()) { 481 Iterator it = updatedSnapshots.entrySet().iterator(); 482 while (it.hasNext()) { 483 Map.Entry entry = (Map.Entry ) it.next(); 484 485 ObjectId key = (ObjectId) entry.getKey(); 486 DataRow newSnapshot = (DataRow) entry.getValue(); 487 DataRow oldSnapshot = (DataRow) snapshots.put(key, newSnapshot); 488 489 492 494 507 if (oldSnapshot != null) { 508 if (oldSnapshot.getVersion() != newSnapshot.getReplacesVersion()) { 511 logObj 512 .debug("snapshot version changed, don't know what to do... Old: " 513 + oldSnapshot 514 + ", New: " 515 + newSnapshot); 516 forgetSnapshot(key); 517 continue; 518 } 519 520 Map diff = oldSnapshot.createDiff(newSnapshot); 521 522 if (diff != null) { 523 if (diffs == null) { 524 diffs = new HashMap (); 525 } 526 527 diffs.put(key, diff); 528 } 529 } 530 } 531 } 532 533 return diffs; 534 } 535 536 private void processUpdateDiffs(Map diffs) { 537 if (!diffs.isEmpty()) { 539 Iterator it = diffs.entrySet().iterator(); 540 while (it.hasNext()) { 541 Map.Entry entry = (Map.Entry ) it.next(); 542 ObjectId key = (ObjectId) entry.getKey(); 543 DataRow oldSnapshot = (DataRow) snapshots.remove(key); 544 545 if (oldSnapshot == null) { 546 continue; 547 } 548 549 DataRow newSnapshot = oldSnapshot.applyDiff((DataRow) entry.getValue()); 550 snapshots.put(key, newSnapshot); 551 } 552 } 553 } 554 555 private void sendUpdateNotification( 556 Object source, 557 Map diffs, 558 Collection deletedSnapshotIDs, 559 Collection invalidatedSnapshotIDs, 560 Collection indirectlyModifiedIds) { 561 562 if ((diffs != null && !diffs.isEmpty()) 564 || (deletedSnapshotIDs != null && !deletedSnapshotIDs.isEmpty()) 565 || (invalidatedSnapshotIDs != null && !invalidatedSnapshotIDs.isEmpty()) 566 || (indirectlyModifiedIds != null && !indirectlyModifiedIds.isEmpty())) { 567 568 SnapshotEvent event = new SnapshotEvent( 569 source, 570 this, 571 diffs, 572 deletedSnapshotIDs, 573 invalidatedSnapshotIDs, 574 indirectlyModifiedIds); 575 576 if (logObj.isDebugEnabled()) { 577 logObj.debug("postSnapshotsChangeEvent: " + event); 578 } 579 580 582 EventManager.getDefaultManager().postEvent(event, getSnapshotEventSubject()); 585 } 586 } 587 588 public boolean isNotifyingRemoteListeners() { 589 return notifyingRemoteListeners; 590 } 591 592 public void setNotifyingRemoteListeners(boolean notifyingRemoteListeners) { 593 this.notifyingRemoteListeners = notifyingRemoteListeners; 594 } 595 596 private void readObject(ObjectInputStream in) throws IOException , 598 ClassNotFoundException { 599 600 in.defaultReadObject(); 601 602 this.eventSubject = createSubject(); 604 } 605 } | Popular Tags |