KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > cayenne > access > DataRowStore


1 /*****************************************************************
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  ****************************************************************/

19
20 package org.apache.cayenne.access;
21
22 import java.io.IOException JavaDoc;
23 import java.io.ObjectInputStream JavaDoc;
24 import java.io.Serializable JavaDoc;
25 import java.util.Collection JavaDoc;
26 import java.util.Collections JavaDoc;
27 import java.util.HashMap JavaDoc;
28 import java.util.Iterator JavaDoc;
29 import java.util.List JavaDoc;
30 import java.util.Map JavaDoc;
31
32 import org.apache.cayenne.CayenneRuntimeException;
33 import org.apache.cayenne.DataObject;
34 import org.apache.cayenne.DataRow;
35 import org.apache.cayenne.ObjectId;
36 import org.apache.cayenne.PersistenceState;
37 import org.apache.cayenne.Persistent;
38 import org.apache.cayenne.access.event.SnapshotEvent;
39 import org.apache.cayenne.event.EventBridge;
40 import org.apache.cayenne.event.EventBridgeFactory;
41 import org.apache.cayenne.event.EventManager;
42 import org.apache.cayenne.event.EventSubject;
43 import org.apache.commons.collections.ExtendedProperties;
44 import org.apache.commons.collections.map.LRUMap;
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47
48 /**
49  * A fixed size cache of DataRows keyed by ObjectId.
50  * <p>
51  * <strong>Synchronization Note: </strong> DataRowStore synchronizes most operations on
52  * its own instance.
53  * </p>
54  *
55  * @author Andrus Adamchik
56  * @since 1.1
57  */

58 public class DataRowStore implements Serializable JavaDoc {
59
60     private static Log logger = LogFactory.getLog(DataRowStore.class);
61
62     // property keys
63
public static final String JavaDoc SNAPSHOT_EXPIRATION_PROPERTY = "cayenne.DataRowStore.snapshot.expiration";
64     public static final String JavaDoc SNAPSHOT_CACHE_SIZE_PROPERTY = "cayenne.DataRowStore.snapshot.size";
65     public static final String JavaDoc REMOTE_NOTIFICATION_PROPERTY = "cayenne.DataRowStore.remote.notify";
66     public static final String JavaDoc EVENT_BRIDGE_FACTORY_PROPERTY = "cayenne.DataRowStore.EventBridge.factory";
67
68     // default property values
69

70     // default expiration time is 2 hours
71
public static final long SNAPSHOT_EXPIRATION_DEFAULT = 2 * 60 * 60;
72     public static final int SNAPSHOT_CACHE_SIZE_DEFAULT = 10000;
73     public static final boolean REMOTE_NOTIFICATION_DEFAULT = false;
74
75     // use String for class name, since JavaGroups may not be around,
76
// causing CNF exceptions
77
public static final String JavaDoc EVENT_BRIDGE_FACTORY_DEFAULT = "org.apache.cayenne.event.JavaGroupsBridgeFactory";
78
79     protected String JavaDoc name;
80     protected LRUMap snapshots;
81     protected boolean notifyingRemoteListeners;
82
83     protected transient EventManager eventManager;
84     protected transient EventBridge remoteNotificationsHandler;
85
86     // IMPORTANT: EventSubject must be an ivar to avoid its deallocation
87
// too early, and thus disabling events.
88
protected transient EventSubject eventSubject;
89
90     /**
91      * Creates new named DataRowStore with default configuration.
92      *
93      * @deprecated since 3.0 as it instantiates an EventManager internally that can lead
94      * to dispatch thread leaks.
95      */

96     public DataRowStore(String JavaDoc name) {
97         this(name, Collections.EMPTY_MAP);
98     }
99
100     /**
101      * Creates new DataRowStore with a specified name and a set of properties. If no
102      * properties are defined, default values are used.
103      *
104      * @param name DataRowStore name. Used to idenitfy this DataRowStore in events, etc.
105      * Can't be null.
106      * @param properties Properties map used to configure DataRowStore parameters. Can be
107      * null.
108      * @deprecated since 3.0 as it instantiates an EventManager internally that can lead
109      * to dispatch thread leaks.
110      */

111     public DataRowStore(String JavaDoc name, Map JavaDoc properties) {
112         this(name, properties, new EventManager());
113     }
114
115     /**
116      * Creates new DataRowStore with a specified name and a set of properties. If no
117      * properties are defined, default values are used.
118      *
119      * @param name DataRowStore name. Used to idenitfy this DataRowStore in events, etc.
120      * Can't be null.
121      * @param properties Properties map used to configure DataRowStore parameters. Can be
122      * null.
123      * @param eventManager EventManager that should be used for posting and receiving
124      * events.
125      * @since 1.2
126      */

127     public DataRowStore(String JavaDoc name, Map JavaDoc properties, EventManager eventManager) {
128         if (name == null) {
129             throw new IllegalArgumentException JavaDoc("DataRowStore name can't be null.");
130         }
131
132         this.name = name;
133         this.eventSubject = createSubject();
134         this.eventManager = eventManager;
135         initWithProperties(properties);
136     }
137
138     private EventSubject createSubject() {
139         return EventSubject.getSubject(this.getClass(), name);
140     }
141
142     protected void initWithProperties(Map JavaDoc properties) {
143         ExtendedProperties propertiesWrapper = new ExtendedProperties();
144
145         if (properties != null) {
146             propertiesWrapper.putAll(properties);
147         }
148
149         long snapshotsExpiration = propertiesWrapper.getLong(
150                 SNAPSHOT_EXPIRATION_PROPERTY,
151                 SNAPSHOT_EXPIRATION_DEFAULT);
152
153         int snapshotsCacheSize = propertiesWrapper.getInt(
154                 SNAPSHOT_CACHE_SIZE_PROPERTY,
155                 SNAPSHOT_CACHE_SIZE_DEFAULT);
156
157         boolean notifyRemote = propertiesWrapper.getBoolean(
158                 REMOTE_NOTIFICATION_PROPERTY,
159                 REMOTE_NOTIFICATION_DEFAULT);
160
161         String JavaDoc eventBridgeFactory = propertiesWrapper.getString(
162                 EVENT_BRIDGE_FACTORY_PROPERTY,
163                 EVENT_BRIDGE_FACTORY_DEFAULT);
164
165         if (logger.isDebugEnabled()) {
166             logger.debug("DataRowStore property "
167                     + SNAPSHOT_EXPIRATION_PROPERTY
168                     + " = "
169                     + snapshotsExpiration);
170             logger.debug("DataRowStore property "
171                     + SNAPSHOT_CACHE_SIZE_PROPERTY
172                     + " = "
173                     + snapshotsCacheSize);
174             logger.debug("DataRowStore property "
175                     + REMOTE_NOTIFICATION_PROPERTY
176                     + " = "
177                     + notifyRemote);
178             logger.debug("DataRowStore property "
179                     + EVENT_BRIDGE_FACTORY_PROPERTY
180                     + " = "
181                     + eventBridgeFactory);
182         }
183
184         // init ivars from properties
185
this.notifyingRemoteListeners = notifyRemote;
186
187         // TODO: ENTRY EXPIRATION is not supported by commons LRU Map
188
this.snapshots = new LRUMap(snapshotsCacheSize);
189
190         // init event bridge only if we are notifying remote listeners
191
if (notifyingRemoteListeners) {
192             try {
193                 EventBridgeFactory factory = (EventBridgeFactory) Class.forName(
194                         eventBridgeFactory).newInstance();
195
196                 Collection JavaDoc subjects = Collections.singleton(getSnapshotEventSubject());
197                 String JavaDoc externalSubject = EventBridge
198                         .convertToExternalSubject(getSnapshotEventSubject());
199                 this.remoteNotificationsHandler = factory.createEventBridge(
200                         subjects,
201                         externalSubject,
202                         properties);
203             }
204             catch (Exception JavaDoc ex) {
205                 throw new CayenneRuntimeException("Error initializing DataRowStore.", ex);
206             }
207
208             startListeners();
209         }
210     }
211
212     /**
213      * Updates cached snapshots for the list of objects.
214      *
215      * @since 1.2
216      */

217     void snapshotsUpdatedForObjects(List JavaDoc objects, List JavaDoc snapshots, boolean refresh) {
218
219         int size = objects.size();
220
221         // sanity check
222
if (size != snapshots.size()) {
223             throw new IllegalArgumentException JavaDoc(
224                     "Counts of objects and corresponding snapshots do not match. "
225                             + "Objects count: "
226                             + objects.size()
227                             + ", snapshots count: "
228                             + snapshots.size());
229         }
230
231         Map JavaDoc modified = null;
232         Object JavaDoc eventPostedBy = null;
233
234         synchronized (this) {
235             for (int i = 0; i < size; i++) {
236                 Persistent object = (Persistent) objects.get(i);
237
238                 // skip HOLLOW objects as they likely were created from partial snapshots
239
if (object.getPersistenceState() == PersistenceState.HOLLOW) {
240                     continue;
241                 }
242
243                 ObjectId oid = object.getObjectId();
244
245                 // add snapshots if refresh is forced, or if a snapshot is
246
// missing
247

248                 DataRow cachedSnapshot = (DataRow) this.snapshots.get(oid);
249                 if (refresh || cachedSnapshot == null) {
250
251                     DataRow newSnapshot = (DataRow) snapshots.get(i);
252
253                     if (cachedSnapshot != null) {
254                         // use old snapshot if no changes occurred
255
if (object instanceof DataObject
256                                 && cachedSnapshot.equals(newSnapshot)) {
257                             ((DataObject) object).setSnapshotVersion(cachedSnapshot
258                                     .getVersion());
259                             continue;
260                         }
261                         else {
262                             newSnapshot.setReplacesVersion(cachedSnapshot.getVersion());
263                         }
264                     }
265
266                     if (modified == null) {
267                         modified = new HashMap JavaDoc();
268                         eventPostedBy = object.getObjectContext().getGraphManager();
269                     }
270
271                     modified.put(oid, newSnapshot);
272                 }
273             }
274
275             if (modified != null) {
276                 processSnapshotChanges(
277                         eventPostedBy,
278                         modified,
279                         Collections.EMPTY_LIST,
280                         Collections.EMPTY_LIST,
281                         Collections.EMPTY_LIST);
282             }
283         }
284     }
285
286     /**
287      * Returns current cache size.
288      */

289     public int size() {
290         return snapshots.size();
291     }
292
293     /**
294      * Returns maximum allowed cache size.
295      */

296     public int maximumSize() {
297         return snapshots.maxSize();
298     }
299
300     /**
301      * Shuts down any remote notification connections, and clears internal cache.
302      */

303     public void shutdown() {
304         stopListeners();
305         clear();
306     }
307
308     /**
309      * Returns the name of this DataRowStore. Name allows to create EventSubjects for
310      * event notifications addressed to or sent from this DataRowStore.
311      */

312     public String JavaDoc getName() {
313         return name;
314     }
315
316     /**
317      * Sets the name of this DataRowStore. Name allows to create EventSubjects for event
318      * notifications addressed to or sent from this DataRowStore.
319      */

320     public void setName(String JavaDoc name) {
321         this.name = name;
322     }
323
324     /**
325      * Returns an EventManager associated with this DataRowStore.
326      *
327      * @since 1.2
328      */

329     public EventManager getEventManager() {
330         return eventManager;
331     }
332
333     /**
334      * Sets an EventManager associated with this DataRowStore.
335      *
336      * @since 1.2
337      */

338     public void setEventManager(EventManager eventManager) {
339         if (eventManager != this.eventManager) {
340             stopListeners();
341             this.eventManager = eventManager;
342             startListeners();
343         }
344     }
345
346     /**
347      * Returns cached snapshot or null if no snapshot is currently cached for the given
348      * ObjectId.
349      */

350     public synchronized DataRow getCachedSnapshot(ObjectId oid) {
351         return (DataRow) snapshots.get(oid);
352     }
353
354     /**
355      * Returns EventSubject used by this SnapshotCache to notify of snapshot changes.
356      */

357     public EventSubject getSnapshotEventSubject() {
358         return eventSubject;
359     }
360
361     /**
362      * Expires and removes all stored snapshots without sending any notification events.
363      */

364     public synchronized void clear() {
365         snapshots.clear();
366     }
367
368     /**
369      * Evicts a snapshot from cache without generating any SnapshotEvents.
370      */

371     public synchronized void forgetSnapshot(ObjectId id) {
372         snapshots.remove(id);
373     }
374
375     /**
376      * Handles remote events received via EventBridge. Performs needed snapshot updates,
377      * and then resends the event to local listeners.
378      */

379     public void processRemoteEvent(SnapshotEvent event) {
380         if (event.getSource() != remoteNotificationsHandler) {
381             return;
382         }
383
384         if (logger.isDebugEnabled()) {
385             logger.debug("remote event: " + event);
386         }
387
388         Collection JavaDoc deletedSnapshotIds = event.getDeletedIds();
389         Collection JavaDoc invalidatedSnapshotIds = event.getInvalidatedIds();
390         Map JavaDoc diffs = event.getModifiedDiffs();
391         Collection JavaDoc indirectlyModifiedIds = event.getIndirectlyModifiedIds();
392
393         if (deletedSnapshotIds.isEmpty()
394                 && invalidatedSnapshotIds.isEmpty()
395                 && diffs.isEmpty()
396                 && indirectlyModifiedIds.isEmpty()) {
397             logger.warn("processRemoteEvent.. bogus call... no changes.");
398             return;
399         }
400
401         synchronized (this) {
402             processDeletedIDs(deletedSnapshotIds);
403             processInvalidatedIDs(deletedSnapshotIds);
404             processUpdateDiffs(diffs);
405             sendUpdateNotification(
406                     event.getPostedBy(),
407                     diffs,
408                     deletedSnapshotIds,
409                     invalidatedSnapshotIds,
410                     indirectlyModifiedIds);
411         }
412     }
413
414     /**
415      * Processes changes made to snapshots. Modifies internal cache state, and then sends
416      * the event to all listeners. Source of these changes is usually an ObjectStore.
417      */

418     public void processSnapshotChanges(
419             Object JavaDoc postedBy,
420             Map JavaDoc updatedSnapshots,
421             Collection JavaDoc deletedSnapshotIds,
422             Collection JavaDoc invalidatedSnapshotIds,
423             Collection JavaDoc indirectlyModifiedIds) {
424
425         // update the internal cache, prepare snapshot event
426

427         if (deletedSnapshotIds.isEmpty()
428                 && invalidatedSnapshotIds.isEmpty()
429                 && updatedSnapshots.isEmpty()
430                 && indirectlyModifiedIds.isEmpty()) {
431             logger.warn("postSnapshotsChangeEvent.. bogus call... no changes.");
432             return;
433         }
434
435         synchronized (this) {
436             processDeletedIDs(deletedSnapshotIds);
437             processInvalidatedIDs(invalidatedSnapshotIds);
438             Map JavaDoc diffs = processUpdatedSnapshots(updatedSnapshots);
439             sendUpdateNotification(
440                     postedBy,
441                     diffs,
442                     deletedSnapshotIds,
443                     invalidatedSnapshotIds,
444                     indirectlyModifiedIds);
445         }
446     }
447
448     private void processDeletedIDs(Collection JavaDoc deletedSnapshotIDs) {
449         // DELETED: evict deleted snapshots
450
if (!deletedSnapshotIDs.isEmpty()) {
451             Iterator JavaDoc it = deletedSnapshotIDs.iterator();
452             while (it.hasNext()) {
453                 snapshots.remove(it.next());
454             }
455         }
456     }
457
458     private void processInvalidatedIDs(Collection JavaDoc invalidatedSnapshotIds) {
459         // INVALIDATED: forget snapshot, treat as expired from cache
460
if (!invalidatedSnapshotIds.isEmpty()) {
461             Iterator JavaDoc it = invalidatedSnapshotIds.iterator();
462             while (it.hasNext()) {
463                 snapshots.remove(it.next());
464             }
465         }
466     }
467
468     private Map JavaDoc processUpdatedSnapshots(Map JavaDoc updatedSnapshots) {
469         Map JavaDoc diffs = null;
470
471         // MODIFIED: replace/add snapshots, generate diffs for event
472
if (!updatedSnapshots.isEmpty()) {
473             Iterator JavaDoc it = updatedSnapshots.entrySet().iterator();
474             while (it.hasNext()) {
475                 Map.Entry JavaDoc entry = (Map.Entry JavaDoc) it.next();
476
477                 ObjectId key = (ObjectId) entry.getKey();
478                 DataRow newSnapshot = (DataRow) entry.getValue();
479                 DataRow oldSnapshot = (DataRow) snapshots.put(key, newSnapshot);
480
481                 // generate diff for the updated event, if this not a new
482
// snapshot
483

484                 // The following cases should be handled here:
485

486                 // 1. There is no previously cached snapshot for a given id.
487
// 2. There was a previously cached snapshot for a given id,
488
// but it expired from cache and was removed. Currently
489
// handled as (1); what are the consequences of that?
490
// 3. There is a previously cached snapshot and it has the
491
// *same version* as the "replacesVersion" property of the
492
// new snapshot.
493
// 4. There is a previously cached snapshot and it has a
494
// *different version* from "replacesVersion" property of
495
// the new snapshot. It means that we don't know how to merge
496
// the two (we don't even know which one is newer due to
497
// multithreading). Just throw out this snapshot....
498

499                 if (oldSnapshot != null) {
500                     // case 4 above... have to throw out the snapshot since
501
// no good options exist to tell how to merge the two.
502
if (oldSnapshot.getVersion() != newSnapshot.getReplacesVersion()) {
503                         logger
504                                 .debug("snapshot version changed, don't know what to do... Old: "
505                                         + oldSnapshot
506                                         + ", New: "
507                                         + newSnapshot);
508                         forgetSnapshot(key);
509                         continue;
510                     }
511
512                     Map JavaDoc diff = oldSnapshot.createDiff(newSnapshot);
513
514                     if (diff != null) {
515                         if (diffs == null) {
516                             diffs = new HashMap JavaDoc();
517                         }
518
519                         diffs.put(key, diff);
520                     }
521                 }
522             }
523         }
524
525         return diffs;
526     }
527
528     private void processUpdateDiffs(Map JavaDoc diffs) {
529         // apply snapshot diffs
530
if (!diffs.isEmpty()) {
531             Iterator JavaDoc it = diffs.entrySet().iterator();
532             while (it.hasNext()) {
533                 Map.Entry JavaDoc entry = (Map.Entry JavaDoc) it.next();
534                 ObjectId key = (ObjectId) entry.getKey();
535                 DataRow oldSnapshot = (DataRow) snapshots.remove(key);
536
537                 if (oldSnapshot == null) {
538                     continue;
539                 }
540
541                 DataRow newSnapshot = oldSnapshot.applyDiff((DataRow) entry.getValue());
542                 snapshots.put(key, newSnapshot);
543             }
544         }
545     }
546
547     private void sendUpdateNotification(
548             Object JavaDoc postedBy,
549             Map JavaDoc diffs,
550             Collection JavaDoc deletedSnapshotIDs,
551             Collection JavaDoc invalidatedSnapshotIDs,
552             Collection JavaDoc indirectlyModifiedIds) {
553
554         // do not send bogus events... e.g. inserted objects are not counted
555
if ((diffs != null && !diffs.isEmpty())
556                 || (deletedSnapshotIDs != null && !deletedSnapshotIDs.isEmpty())
557                 || (invalidatedSnapshotIDs != null && !invalidatedSnapshotIDs.isEmpty())
558                 || (indirectlyModifiedIds != null && !indirectlyModifiedIds.isEmpty())) {
559
560             SnapshotEvent event = new SnapshotEvent(
561                     this,
562                     postedBy,
563                     diffs,
564                     deletedSnapshotIDs,
565                     invalidatedSnapshotIDs,
566                     indirectlyModifiedIds);
567
568             if (logger.isDebugEnabled()) {
569                 logger.debug("postSnapshotsChangeEvent: " + event);
570             }
571
572             // synchronously notify listeners; leaving it up to the listeners to
573
// register as "non-blocking" if needed.
574
eventManager.postEvent(event, getSnapshotEventSubject());
575         }
576     }
577
578     public boolean isNotifyingRemoteListeners() {
579         return notifyingRemoteListeners;
580     }
581
582     public void setNotifyingRemoteListeners(boolean notifyingRemoteListeners) {
583         this.notifyingRemoteListeners = notifyingRemoteListeners;
584     }
585
586     // deserialization support
587
private void readObject(ObjectInputStream JavaDoc in) throws IOException JavaDoc,
588             ClassNotFoundException JavaDoc {
589
590         in.defaultReadObject();
591
592         // restore subjects
593
this.eventSubject = createSubject();
594     }
595
596     void stopListeners() {
597         eventManager.removeListener(this);
598         if (remoteNotificationsHandler != null) {
599             try {
600                 remoteNotificationsHandler.shutdown();
601             }
602             catch (Exception JavaDoc ex) {
603                 logger.info("Exception shutting down EventBridge.", ex);
604             }
605             remoteNotificationsHandler = null;
606         }
607     }
608
609     void startListeners() {
610         if (remoteNotificationsHandler != null) {
611             try {
612                 // listen to EventBridge ... must add itself as non-blocking listener
613
// otherwise a deadlock can occur as "processRemoteEvent" will attempt to
614
// obtain a lock on this object when the dispatch queue is locked... And
615
// another commit thread may have this object locked and attempt to lock
616
// dispatch queue
617

618                 eventManager.addNonBlockingListener(
619                         this,
620                         "processRemoteEvent",
621                         SnapshotEvent.class,
622                         getSnapshotEventSubject(),
623                         remoteNotificationsHandler);
624
625                 // start EventBridge - it will listen to all event sources for this
626
// subject
627
remoteNotificationsHandler.startup(
628                         eventManager,
629                         EventBridge.RECEIVE_LOCAL_EXTERNAL);
630             }
631             catch (Exception JavaDoc ex) {
632                 throw new CayenneRuntimeException("Error initializing DataRowStore.", ex);
633             }
634         }
635     }
636 }
637
Popular Tags