KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > ozoneDB > core > storage > wizardStore > ClusterStore


1 // You can redistribute this software and/or modify it under the terms of
2
// the Ozone Core License version 1 published by ozone-db.org.
3
//
4
// The original code and portions created by SMB are
5
// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
6
//
7
// $Id: ClusterStore.java,v 1.3 2004/01/10 21:40:24 per_nyfelt Exp $
8

9 package org.ozoneDB.core.storage.wizardStore;
10
11 import java.io.*;
12 import java.util.zip.GZIPInputStream JavaDoc;
13 import java.util.zip.GZIPOutputStream JavaDoc;
14
15 import org.ozoneDB.DxLib.*;
16 import org.ozoneDB.Setup;
17 import org.ozoneDB.io.stream.ResolvingObjectInputStream;
18 import org.ozoneDB.core.*;
19 import org.ozoneDB.core.storage.*;
20 import org.ozoneDB.core.storage.ClusterID;
21 import org.ozoneDB.core.storage.Cluster;
22 import org.ozoneDB.util.LogWriter;
23
24 /**
25  * The ClusterStore is the back-end store of the wizardStore. It maintains the
26  * cluster cache, activation/passivation and the actual persistent commits.
27  *
28  *
29  * @author <a HREF="http://www.softwarebuero.de/">SMB</a>
30  * @author <a HREF="http://www.medium.net/">Medium.net</a>
31  * @version $Revision: 1.3 $Date: 2004/01/10 21:40:24 $
32  */

33 public final class ClusterStore extends AbstractClusterStore {
34
35     public final static String JavaDoc POSTFIX_SHADOW = ".sh";
36
37     protected final static int compressionFactor = 3;
38
39     protected DxMap cachedClusters;
40
41     protected int maxClusterSize = 64 * 1024;
42
43     /**
44      * Table that maps Permissions to ClusterIDs.
45      */

46     protected DxMap growingClusterIDs;
47
48     private boolean compressClusters;
49
50
51     ClusterStore(Env _env) {
52         super(_env);
53         maxClusterSize = env.config.intProperty(Setup.WS_CLUSTER_SIZE, -1);
54         cachedClusters = new DxHashMap(64);
55         compressClusters = env.config.booleanProperty(Setup.WS_COMPRESS_CLUSTERS, true);
56     }
57
58
59     public void startup() throws Exception JavaDoc {
60         growingClusterIDs = new DxHashMap(32);
61     }
62
63
64     public void shutdown() {
65     }
66
67
68     /**
69      * Check if the ClusterStore was cleanly shutted down.
70      */

71     public boolean isCleanShutdown() {
72         File file = new File(env.getDatabaseDir() + Env.DATA_DIR);
73         String JavaDoc[] fileList = file.list();
74
75         for (int i = 0; i < fileList.length; i++) {
76             if (fileList[i].endsWith(POSTFIX_SHADOW) || fileList[i].endsWith(POSTFIX_TEMP)) {
77                 return false;
78             }
79         }
80         return true;
81     }
82
83
84     /**
85      * Search the DATA dir and recover all ClusterIDs.
86      */

87     public DxSet recoverClusterIDs() {
88         File file = new File(env.getDatabaseDir() + Env.DATA_DIR);
89         String JavaDoc[] fileList = file.list();
90
91         DxSet result = new DxHashSet();
92         for (int i = 0; i < fileList.length; i++) {
93             if (fileList[i].endsWith(POSTFIX_CLUSTER) || fileList[i].endsWith(POSTFIX_SHADOW)) {
94                 String JavaDoc cidString = fileList[i].substring(0, fileList[i].indexOf('.'));
95                 long cid = Long.parseLong(cidString);
96                 result.add(new ClusterID(cid));
97             }
98         }
99         return result;
100     }
101
102
103     public long currentCacheSize() {
104         long result = 0;
105         DxIterator it = cachedClusters.iterator();
106         Cluster cluster;
107         while ((cluster = (Cluster) it.next()) != null) {
108             result += cluster.size();
109         }
110         return result;
111     }
112
113
114     public int currentBytesPerContainer() {
115         int result = env.config.intProperty(Setup.WS_CLUSTER_SIZE_RATIO, 256);
116 // env.logWriter.newEntry( this, "currentBytesPerContainer(): setup:" + result, LogWriter.DEBUG );
117
return result;
118
119         // if (cachedClusters.count() < 3) {
120
// int result = env.config.intProperty (Setup.WS_CLUSTER_SIZE_RATIO, 256);
121
// env.logWriter.newEntry (this, "currentBytesPerContainer(): config:" + result, LogWriter.DEBUG);
122
// return result;
123
// }
124
// else {
125
// int bpc = 0;
126
// int count = 0;
127
// DxIterator it = cachedClusters.iterator();
128
// WizardCluster cluster;
129
// while ((cluster=(WizardCluster)it.next()) != null) {
130
// count ++;
131
// bpc += cluster.bytesPerContainer;
132
// }
133
// int result = bpc / count;
134
// env.logWriter.newEntry (this, "currentBytesPerContainer(): new:" + result, LogWriter.DEBUG);
135
// return result;
136
// }
137
}
138
139
140     // public WizardCluster lruCluster() {
141
// search the LRU cluster to speed things up; since this is not
142
// synchronized, checking and accessing currentCluster must be done in
143
// one line to avoid other thread to change the variable in between
144
// container = (currentCluster != null && currentCluster.lock != null) ? currentCluster.containerForID (id) : null;
145
// if (container != null) {
146
// // System.out.print ("+");
147
// return container.isDeleted() ? null : container;
148
// }
149

150
151     /**
152      * @param perms Permissions of the cluster to search.
153      * @return WizardCluster with the specified permissions that is good to store a
154      * new container in it.
155      */

156     protected synchronized Cluster growingCluster(Permissions perms) throws Exception JavaDoc {
157         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
158             env.logWriter.newEntry(this, "growingCluster() ", LogWriter.DEBUG3);
159         }
160
161         Cluster cluster = null;
162         ClusterID cid = (ClusterID) growingClusterIDs.elementForKey(perms);
163
164         // load the current growing cluster and check space
165
if (cid != null) {
166             cluster = (Cluster) cachedClusters.elementForKey(cid);
167             if (cluster == null) {
168                 cluster = loadCluster(cid, true);
169                 if (cluster instanceof WizardCluster) {
170                     ((WizardCluster)cluster).unpin();
171                 }
172             }
173             // check cluster size and if it was deactivated by the trimCache();
174
// use this cluster only if it isn't used by another ta
175
if (cluster.lock() == null || cluster.size() >= maxClusterSize ||
176                     cluster.lock().level(null) > Lock.LEVEL_NONE && !cluster.lock().isAcquiredBy(env.transactionManager.currentTA())) {
177
178                 if (env.logWriter.hasTarget(LogWriter.DEBUG1)) {
179                     env.logWriter.newEntry(this,
180                             "growingCluster(): growing cluster not usable: cid=" + cluster.clusterID() + " size=" + cluster.size() + " lockLevel=" +
181                             (cluster.lock() != null ? String.valueOf(cluster.lock().level(null)) : "null"),
182                             LogWriter.DEBUG1);
183                 }
184
185                 growingClusterIDs.removeForKey(perms);
186                 cluster = null;
187             }
188         }
189
190         // search all currently loaded clusters
191
if (cluster == null) {
192             DxIterator it = cachedClusters.iterator();
193             Cluster cursor;
194             while ((cursor = (Cluster) it.next()) != null) {
195                 // System.out.println (cursor.size());
196
if (cursor.size() < maxClusterSize && cursor.permissions().equals(perms)) {
197                     cluster = cursor;
198
199                     // make sure that there is enough space for the clusters to be
200
// able to grow to the max size
201
// ensureCacheSpace (maxClusterSize - cluster.size());
202
trimCache();
203
204                     // check if the cluster deactivated be the ensureCacheSpace
205
if (cluster.lock() == null) {
206                         env.logWriter.newEntry(this,
207                                 "growingCluster(): loaded cluster was deactivated: " + cluster.clusterID(),
208                                 LogWriter.DEBUG);
209                         cluster = null;
210                     } else if (cluster.lock().level(null) > Lock.LEVEL_NONE && !cluster.lock().isAcquiredBy(
211                             env.transactionManager.currentTA())) {
212                         // use this cluster only if it isn't used by another ta
213
if (env.logWriter.hasTarget(LogWriter.DEBUG1)) {
214                             env.logWriter.newEntry(this,
215                                     "growingCluster(): loaded cluster is locked by another transaction: "
216                                     + cluster.clusterID(), LogWriter.DEBUG1);
217                         }
218                         cluster = null;
219                     } else {
220                         growingClusterIDs.addForKey(cluster.clusterID(), perms);
221                         if (env.logWriter.hasTarget(LogWriter.DEBUG1)) {
222                             env.logWriter.newEntry(this,
223                                     "growingCluster(): loaded cluster is now growing cluster: " + cluster.clusterID()
224                                     + " size:" + cluster.size(), LogWriter.DEBUG1);
225                         }
226                         break;
227                     }
228                 }
229             }
230         }
231
232         // write a new, empty cluster and load it just after to ensures
233
// that new cluster is "regularly" loaded
234
if (cluster == null) {
235             cluster = createANewEmptyAndUsableCluster(perms);
236         }
237
238         return cluster;
239     }
240
241     /**
242      Creates a cluster which is
243      <UL>
244      <LI>new</LI>
245      <LI>empty</LI>
246      <LI>usable and</LI>
247      <LI>not locked</LI>
248      </UL>
249      */

250     protected synchronized Cluster createANewEmptyAndUsableCluster(Permissions perms) throws IOException, ClassNotFoundException JavaDoc {
251 // env.logWriter.newEntry( this, "growingCluster(): creating new cluster...", LogWriter.DEBUG );
252
Cluster cluster = new WizardCluster(new ClusterID(env.keyGenerator.nextID()), perms, env.transactionManager.newLock(), 256);
253
254         // the new cluster has to be written to disk in order to make
255
// saveShadow() and things work;
256
storeData(cluster, basename(cluster.clusterID()) + POSTFIX_CLUSTER);
257         /* // Old
258         // If we do not pin, the freshly created cluster may be deactivated and thus its lock may be null
259         cluster.pin();
260         try {
261             // since we don't check the cache size after registering a cont
262             // we have to make sure that there is enough space for this cluster
263             // to grow to the max size
264             // ensureCacheSpace (maxClusterSize);
265             trimCache();
266             cluster = loadCluster(cluster.clusterID(), false);
267         } finally {
268             cluster.unpin();
269         }
270         */

271         // since we don't check the cache size after registering a cont
272
// we have to make sure that there is enough space for this cluster
273
// to grow to the max size
274
// ensureCacheSpace (maxClusterSize);
275
trimCache();
276
277         // We need to load the cluster pinned because loadCluster guarantees only to return a not-unloaded cluster if it is pinned.
278
cluster = loadCluster(cluster.clusterID(), true);
279         if (cluster instanceof WizardCluster) {
280             ((WizardCluster)cluster).unpin();
281         }
282
283         growingClusterIDs.addForKey(cluster.clusterID(), perms);
284 // env.logWriter.newEntry( this, "growingCluster(): new cluster created: " + cluster.clusterID(), LogWriter.DEBUG );
285

286         return cluster;
287     }
288
289     /**
290      Returns or creates a cluster which is not locked so that locking it will succeed.
291      The returned cluster is only guaranteed to be not locked by any other thread as long as this
292      method is called during synchronization to this ClusterStore.
293      */

294     protected Cluster giveMeAnUnlockedCluster(Permissions perms) throws IOException, ClassNotFoundException JavaDoc {
295         return createANewEmptyAndUsableCluster(perms);
296     }
297
298     /**
299      Associates the specified container with a cluster.
300
301      Iff this method returns normally (without exception), the container is pinned and thus
302      has to be unpinned.
303
304      Iff this method returns normally (without exception), the container (and thus the cluster of the container)
305      is write locked
306
307      @param container Container to be registered with one cluster.
308      */

309     public void registerContainerAndLock(StorageObjectContainer container, Permissions perms, Transaction locker, int lockLevel) throws Exception JavaDoc {
310         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
311             env.logWriter.newEntry(this, "registerContainer()", LogWriter.DEBUG3);
312         }
313
314         Cluster cluster = null;
315
316         boolean pinned = false;
317         boolean locked = false;
318         boolean alright = false;
319
320         try {
321             synchronized (this) {
322                 cluster = growingCluster(perms);
323
324                 Lock clusterLock = cluster.lock();
325                 int prevLevel = clusterLock.tryAcquire(locker, lockLevel);
326
327                 if (prevLevel == Lock.NOT_ACQUIRED) { // The cluster we are trying to lock is already locked, so we take another cluster
328
cluster = giveMeAnUnlockedCluster(perms);
329
330                     clusterLock = cluster.lock();
331                     prevLevel = clusterLock.tryAcquire(locker, lockLevel);
332
333                     if (prevLevel == Lock.NOT_ACQUIRED) {
334                         throw new Error JavaDoc("BUG! We could not acquire a lock for an unlocked cluster.");
335                     }
336                 }
337                 locked = true;
338
339                 cluster.registerContainer(container);
340                 container.pin();
341                 pinned = true;
342             }
343             cluster.updateLockLevel(locker);
344
345             if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
346                 env.logWriter.newEntry(this, " cluster: " + cluster.clusterID(), LogWriter.DEBUG3);
347             }
348             alright = true;
349         } finally {
350             if (!alright) {
351                 if (locked) {
352                     cluster.lock().release(locker);
353                 }
354                 if (pinned) {
355                     container.unpin();
356                 }
357             }
358         }
359     }
360
361
362     public void invalidateContainer(StorageObjectContainer container) /*throws Exception*/ {
363         synchronized (container) {
364             container.getCluster().removeContainer(container);
365             container.setCluster(null);
366         }
367     }
368
369
370     protected Cluster restoreCluster(ClusterID cid) throws Exception JavaDoc {
371         String JavaDoc basename = basename(cid);
372         Cluster cluster;
373
374         new File(basename + POSTFIX_LOCK).delete();
375         new File(basename + POSTFIX_TEMP).delete();
376
377         File shadowFile = new File(basename + POSTFIX_SHADOW);
378         File clusterFile = new File(basename + POSTFIX_CLUSTER);
379
380         if (shadowFile.exists()) {
381             /*
382                 FIXME:
383                 Who says that shadow files are always better than cluster files?
384                 Ozone may have crashed just when starting to write the shadow file.
385                 The following if clause catches this if the file size is 0.
386                 But what if the file is written incompletely, but it's length is not zero?
387                 It will be regarded as intact copy, while it is not.
388
389                 Maybe we should return to atomic rename at WizardCluster.saveShadow()
390             */

391             if (shadowFile.length() > 0) {
392                 if (!shadowFile.renameTo(clusterFile)) {
393                     throw new IOException("Unable to rename shadow file.");
394                 }
395             } else {
396                 shadowFile.delete();
397             }
398         }
399         cluster = (Cluster) loadData(basename + POSTFIX_CLUSTER);
400         activateCluster(cluster, 0);
401
402         return cluster;
403     }
404
405
406     /**
407      * Make sure the corresponding cluster is in the cache. While loading
408      * clusters, we may have to throw away (and maybe store) some currently
409      * cached clusters.
410      *
411      *
412      * @param cid ClusterID of the cluster to load.
413      * @param pin
414      wether the loaded cluster should be pinned as soon as it is loaded
415      so that there may be no chance to unload unless it is unpinned.
416      If this parameter is set to true, the user has to unpin the cluster.
417      If this parameter is set to false, the cluster may already be unloaded when this method returns.
418      after using it.
419      */

420     public Cluster loadCluster(ClusterID cid, boolean pin) throws IOException, ClassNotFoundException JavaDoc {
421         Cluster cluster = (Cluster) cachedClusters.elementForKey(cid);
422         if (cluster == null) {
423
424             if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
425                 env.logWriter.newEntry(this, "loadCluster(): load cluster from disk: " + cid.toString(), LogWriter.DEBUG);
426             }
427
428             String JavaDoc basename = basename(cid);
429             String JavaDoc clusterName = basename + POSTFIX_CLUSTER;
430             String JavaDoc lockName = basename + POSTFIX_LOCK;
431
432             int clusterByteSize = (int) new File(clusterName).length();
433             if (compressClusters) {
434                 clusterByteSize *= compressionFactor;
435             }
436
437             // make sure that many different threads don't load
438
// to much data before the currently synchronized thread
439
// can trim the cache
440
trimCache();
441             cluster = (Cluster) loadData(clusterName);
442
443             synchronized (this) {
444
445                 // now we have to check the cachedClusters table inside the
446
// synchronized block to see if someone did register this
447
// cluster while we loaded it
448
Cluster interimCluster = (Cluster) cachedClusters.elementForKey(cid);
449                 if (interimCluster != null) {
450                     //with appropriate locking|pinning, it is no problem or danger.
451
env.logWriter.newEntry(this, "loadCluster(): cluster was loaded by another thread too; droping my copy", LogWriter.DEBUG);
452
453                     cluster = interimCluster;
454
455                     if (pin && cluster instanceof WizardCluster) {
456                         ((WizardCluster)cluster).pin();
457                     }
458                     
459                 } else {
460                     // we are going to mess with the cluster; it seems that the cluster
461
// is not visible to other thread until it is added to cachedClusters,
462
// however, IBM jdk throws an exception in cluster.updateLockLevel, which
463
// seems to be related to the initialization in the following block
464
synchronized (cluster) {
465                         // locks are only there if the lock level is >= READ
466
try {
467                             cluster.setLock((Lock) loadData(lockName));
468                             new File(lockName).delete();
469                             ((MROWLock) cluster.lock()).setDebugInfo("clusterID=" + cluster.clusterID());
470                         } catch (Exception JavaDoc e) {
471                             if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
472                                 env.logWriter.newEntry(this, " Unable to load lock from disk - creating a new lock.", LogWriter.DEBUG3);
473                             }
474                             cluster.setLock(env.transactionManager.newLock());
475                             ((MROWLock) cluster.lock()).setDebugInfo("clusterID=" + cluster.clusterID());
476                         }
477
478                         if (pin && cluster instanceof WizardCluster) { // We pin inside the synchronization to the cluster, because calling pin() will try another synchronization and two nested synchronizations to an object are faster than two serial synchronizations.
479
((WizardCluster)cluster).pin();
480                         }
481
482                         activateCluster(cluster, clusterByteSize);
483                     }
484
485                     if (clusterByteSize > maxClusterSize * 2) {
486                         splitCluster(cluster);
487                     }
488
489                     cachedClusters.addForKey(cluster, cluster.clusterID());
490
491                     trimCache();
492                 }
493             }
494         } else {
495             synchronized (cluster) {
496                 if (pin && cluster instanceof WizardCluster) {
497                     ((WizardCluster)cluster).pin();
498                 }
499             }
500         }
501         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
502             env.logWriter.newEntry(this, "returning WizardCluster: " + cluster, LogWriter.DEBUG3);
503         }
504         return cluster;
505     }
506
507
508     public void splitCluster(Cluster cluster) {
509     }
510
511
512     /**
513      * Remove cluster from the cluster cache.
514      * @param cid
515      */

516     public void unloadCluster(ClusterID cid, boolean deactivate) throws IOException {
517         if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
518             env.logWriter.newEntry(this, "unloadCluster(" + cid + "," + deactivate + ").", LogWriter.DEBUG);
519         }
520         
521         Cluster cluster = (Cluster) cachedClusters.removeForKey(cid);
522
523         if (deactivate) {
524             deactivateCluster(cluster);
525         }
526     }
527
528
529     /**
530      * Ensure that there is at least the specified size of free space in the
531      * cluster cache. Under some circumstances clusters (currently invoked)
532      * cannot be deactivated. Therefore this method cannot guarantee that the
533      * needed space is free afterwards.<p>
534      *
535      * This is the central method of the deactivation of containers that are
536      * currently in use. This is different from the commit behaviour.
537      */

538     protected void trimCache() throws IOException {
539
540         long freeSpace = env.freeMemory();
541         if (false && env.logWriter.hasTarget(LogWriter.DEBUG)) {
542             env.logWriter.newEntry(this, "trimCache(): free:" + freeSpace, LogWriter.DEBUG2);
543         }
544
545         boolean tryRemoveCluster = true;
546         while (freeSpace <= 0 && tryRemoveCluster) {
547             tryRemoveCluster = false;
548             synchronized (this) {
549                 long cacheSize = 0;
550
551                 // build priority queue for all currently loaded clusters
552
DxMap priorityQueue = new DxTreeMap();
553                 DxIterator it = cachedClusters.iterator();
554                 Cluster cluster;
555                 while ((cluster = (Cluster) it.next()) != null) {
556                     priorityQueue.addForKey(cluster, cluster.cachePriority());
557                     cacheSize += cluster.size();
558                 }
559
560                 // free at least 20% of the cache
561
long cacheSizeToRemove = cacheSize / 5;
562                 if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
563                     env.logWriter.newEntry(this, " cache: " + cacheSize + " to be freed:" + cacheSizeToRemove, LogWriter.DEBUG2);
564                 }
565
566                 // throw away (deactivate) clusters, lowest priority first
567
it = priorityQueue.iterator();
568                 while (cacheSizeToRemove > 0 && (cluster = (WizardCluster) it.next()) != null) {
569
570                     // if any of the containers is currently invoked, the cluster
571
// must not be written and must stay in memory
572
// The same applies for pinned containers.
573
// FIXME: Once pinning is fully established, we may not need to call isInvoked() anymore.
574
if (cluster instanceof WizardCluster) {
575                         WizardCluster wizardCluster = (WizardCluster) cluster;
576                         if ((!wizardCluster.isPinned()) && (!wizardCluster.isInvoked())) {
577                             if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
578                                 env.logWriter.newEntry(this, "DEACTIVATE cluster: " + cluster.clusterID(), LogWriter.DEBUG2);
579                             }
580
581                             cluster = (Cluster) it.removeObject();
582                             cacheSizeToRemove -= cluster.size();
583                             unloadCluster(cluster.clusterID(), true);
584                             tryRemoveCluster = true;
585     // if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
586
// env.logWriter.newEntry(this, " free:" + freeSpace, LogWriter.DEBUG2);
587
// }
588
} else {
589                             if (false) {
590                                 env.logWriter.newEntry(this, "trying to DEACTIVATE 'invoked' cluster: " + cluster.clusterID(), LogWriter.WARN);
591                             }
592                         }
593                     } else {
594                         env.logWriter.newEntry(this, "the cluster is not a WizardCluster, not sure what to do", LogWriter.WARN);
595                     }
596                 }
597                 System.gc();
598                 freeSpace = env.freeMemory();
599             }
600         }
601     }
602
603
604     /**
605      * This method is called right after the specified WizardCluster was loaded from
606      * disk.
607      */

608     protected void activateCluster(Cluster cluster, int size) {
609         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
610             env.logWriter.newEntry(this, "activateCluster(): " + cluster.clusterID(), LogWriter.DEBUG3);
611         }
612         cluster.setEnv(env);
613         cluster.setClusterStore(this);
614         cluster.touch();
615         cluster.setCurrentSize(size);
616     }
617
618
619     /**
620      * Deactivate the specified cluster before it is written to disk. The
621      * specified cluster will be removed from the cluster cache. If it currently
622      * has shadows, they are written to disk. If any of the containers are
623      * currently invoked (should normally never happen), the shadows must stay
624      * in memory.
625      */

626     protected void deactivateCluster(Cluster cluster) throws IOException {
627         if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
628             env.logWriter.newEntry(this,
629                     "deactivateCluster(): " + cluster.clusterID() + " priority: " + cluster.cachePriority(),
630                     LogWriter.DEBUG);
631             env.logWriter.newEntry(this, " lock: " + cluster.lock().level(null), LogWriter.DEBUG);
632         }
633
634         String JavaDoc basename = basename(cluster.clusterID());
635
636         synchronized (this) { // We synchronize on this ClusterStore so that a freshly returned cluster within the ClusterStore lock may not be deactivated during the lock time.
637
synchronized (cluster) {
638                 // any lock levels >= READ has to be persistent
639
if (cluster.lock().level(null) >= Lock.LEVEL_READ) {
640                     if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
641                         env.logWriter.newEntry(this, " write lock to disk: " + cluster.clusterID(), LogWriter.DEBUG);
642                     }
643
644                     storeData(cluster.lock(), basename + POSTFIX_LOCK);
645                 } else {
646                     File lockFile = new File(basename + POSTFIX_LOCK);
647                     if (lockFile.exists()) {
648                         lockFile.delete();
649                     }
650                 }
651
652                 // clusters with WRITE lock are supposed to be dirty
653
if (cluster.lock().level(null) > Lock.LEVEL_UPGRADE) {
654                     if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
655                         env.logWriter.newEntry(this, " write cluster: " + cluster.clusterID(), LogWriter.DEBUG);
656                     }
657
658                     storeData(cluster, basename + POSTFIX_CLUSTER);
659                 }
660
661                 // mark the cluster to be not valid
662
cluster.setLock(null);
663             }
664         }
665     }
666
667
668     /**
669      * Store the specified cluster on disk. Write temp files first. If this
670      * write fails, the original are still valid. The cluster may has been
671      * written to the disk already, if is was deactivated while transaction.
672      * But in case the cluster (and its changes) are only in memory, we have to
673      * write now to check if this is possible without errors.
674      *
675      * Note: This only writes all currently commited transaction results to the
676      * disk. This is different from the deactivation behaviour.
677      *
678      *
679      * @param cid WizardCluster to be prepare-commited.
680      * @exception java.io.IOException None of the clusters are written to disk.
681      */

682     public synchronized void prepareCommitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException JavaDoc {
683         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
684             env.logWriter.newEntry(this, "prepareCommitCluster(): " + cid, LogWriter.DEBUG3);
685         }
686
687 // WizardCluster cluster = loadCluster(cid,false);
688

689         // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
690

691         Cluster cluster = loadCluster(cid, true);
692         if (cluster instanceof WizardCluster) {
693             ((WizardCluster)cluster).unpin();
694         }
695         cluster.prepareCommit(ta);
696         if (cluster.lock().level(null) >= Lock.LEVEL_WRITE) {
697             String JavaDoc tempFilename = basename(cid) + POSTFIX_TEMP;
698
699             // write changed cluster in temp file; the lock is written in
700
// commit() and abort()
701
storeData(cluster, tempFilename);
702
703             long fileSize = new File(tempFilename).length();
704             if (fileSize == 0L) {
705                 throw new IOException("Unable to determine cluster file size.");
706             }
707
708             if (compressClusters) {
709                 fileSize *= compressionFactor;
710             }
711             cluster.setCurrentSize((int) fileSize);
712         }
713     }
714
715
716     /**
717      * Actually commit the specified cluster. This simply renames the temp file
718      * to be the new "original" ones. The rename operation MUST NOT fail.
719      *
720      *
721      * @param cid WizardCluster to be commited.
722      */

723     public synchronized void commitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException JavaDoc {
724         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
725             env.logWriter.newEntry(this, "commitCluster(): " + cid, LogWriter.DEBUG3);
726         }
727
728         String JavaDoc basename = basename(cid);
729         File clusterFile = new File(basename + POSTFIX_CLUSTER);
730         File tempFile = new File(basename + POSTFIX_TEMP);
731
732         if (tempFile.exists()) {
733             clusterFile.delete();
734             if (!tempFile.renameTo(clusterFile)) {
735                 throw new IOException("Unable to rename temp cluster.");
736             }
737         }
738
739         // FIXME: if transaction size exceeds cache size, this loads the
740
// cluster again altough it's not really needed
741
// WizardCluster cluster = loadCluster(cid,false);
742

743         // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
744
Cluster cluster = loadCluster(cid, true);
745         if (cluster instanceof WizardCluster) {
746             ((WizardCluster)cluster).unpin();
747         }
748         cluster.commit(ta);
749
750         // after the cluster is commited its lock is released and has to be
751
// updated on disk; if no lock file exists, the lock is newly created
752
// when loading
753
updateLockOnDisk(cluster, ta);
754     }
755
756
757     /**
758      * Actually abort the specified cluster. This deletes t
759      * @param cid WizardCluster to be aborted.
760      */

761     public synchronized void abortCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException JavaDoc {
762         File tempFile = new File(basename(cid) + POSTFIX_TEMP);
763         if (tempFile.exists()) {
764             if (!tempFile.delete()) {
765                 throw new IOException("Unable to delete temp cluster.");
766             }
767         }
768
769         // FIXME: if transaction size exceeds cache size, this loads the
770
// cluster again altough it's not really needed
771
// WizardCluster cluster = loadCluster(cid,false);
772

773         // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
774
Cluster cluster = loadCluster(cid, true);
775         boolean isWizardCluster = cluster instanceof WizardCluster;
776         if (isWizardCluster) {
777             ((WizardCluster)cluster).unpin();
778         }
779         cluster.abort(ta);
780
781         if (isWizardCluster && ((WizardCluster)cluster).isPinned() ) { // If the cluster is pinned, it should be reloaded immediately.
782
/*
783                 To other ozone-developers:
784
785                 What do we do if the cluster is pinned and thus may not be removed from memory?
786                 Is this only the case if another transaction is waiting for this cluster to
787                 be unlocked?
788                 If so, should, in this case, the transaction simply reload the cluster?
789             */

790
791             env.logWriter.newEntry(this, "abortCluster(): Unloading pinned cluster " + cid + ". Should we really do that?", LogWriter.DEBUG);
792
793             // the above abort() call does not change the cluster in memory, so
794
// we have to reload the cluster immediately
795
unloadCluster(cid, false);
796             loadCluster(cid, true);
797
798         } else {
799             // the above abort() call does not change the cluster in memory, so
800
// we have to reload the cluster next time
801
unloadCluster(cid, false);
802         }
803
804         // after the cluster is aborted its lock is released and has to be
805
// updated on disk; if no lock file exists, the lock is newly created
806
// when loading
807
updateLockOnDisk(cluster, ta);
808
809     }
810
811
812     protected void updateLockOnDisk(Cluster cluster, Transaction ta) throws IOException {
813         // System.out.println ("commit " + cid + ": " + ((DefaultLock)cluster.lock).lockers.count());
814
ClusterID cid = cluster.clusterID();
815         if (cluster.lock().level(ta) == Lock.LEVEL_NONE) {
816             File lockFile = new File(basename(cid) + POSTFIX_LOCK);
817             if (lockFile.exists() && !lockFile.delete()) {
818                 throw new IOException("Unable to delete lock file.");
819             }
820         } else {
821             storeData(cluster.lock(), basename(cid) + POSTFIX_LOCK);
822         }
823     }
824
825
826     /**
827      * Serialize and store the specified object for the specified key. This
828      * current implementation uses the file system as back end store.
829      */

830     protected void storeData(Object JavaDoc obj, String JavaDoc key) throws IOException {
831         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
832             env.logWriter.newEntry(this, "storeData(): " + key, LogWriter.DEBUG3);
833         }
834
835         OutputStream JavaDoc out = new FileOutputStream(key);
836
837         if (compressClusters) {
838             out = new GZIPOutputStream JavaDoc(out, 3 * 4096);
839         } else {
840             out = new BufferedOutputStream(out, 3 * 4096);
841         }
842
843         ObjectOutputStream oout = new ObjectOutputStream(out);
844         try {
845             oout.writeObject(obj);
846         } finally {
847             oout.close();
848         }
849     }
850
851
852     /**
853      * Load the data that previously has been stored for the given key.
854      */

855     protected Object JavaDoc loadData(String JavaDoc key) throws IOException, ClassNotFoundException JavaDoc {
856         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
857             env.logWriter.newEntry(this, "loadData(): " + key, LogWriter.DEBUG3);
858         }
859
860         InputStream JavaDoc in = new FileInputStream(key);
861
862         if (compressClusters) {
863             in = new GZIPInputStream JavaDoc(in, 3 * 4096);
864         } else {
865             in = new BufferedInputStream(in, 3 * 4096);
866         }
867
868         ObjectInputStream oin = new ResolvingObjectInputStream(in);
869         try {
870             Object JavaDoc result = oin.readObject();
871             return result;
872         } finally {
873             oin.close();
874         }
875     }
876
877 }
878
Popular Tags