KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > ozoneDB > core > storage > magicStore > ClusterStore


1 // You can redistribute this software and/or modify it under the terms of
2
// the Ozone Core License version 1 published by ozone-db.org.
3
//
4
// The original code and portions created by SMB are
5
// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
6
//
7
// $Id: ClusterStore.java,v 1.11 2004/03/10 14:59:48 leomekenkamp Exp $
8

9 package org.ozoneDB.core.storage.magicStore;
10
11 import java.io.BufferedInputStream JavaDoc;
12 import java.io.BufferedOutputStream JavaDoc;
13 import java.io.File JavaDoc;
14 import java.io.FileInputStream JavaDoc;
15 import java.io.FileOutputStream JavaDoc;
16 import java.io.FilenameFilter JavaDoc;
17 import java.io.IOException JavaDoc;
18 import java.io.InputStream JavaDoc;
19 import java.io.ObjectInputStream JavaDoc;
20 import java.io.ObjectOutputStream JavaDoc;
21 import java.io.OutputStream JavaDoc;
22 import java.util.HashSet JavaDoc;
23 import java.util.Iterator JavaDoc;
24 import java.util.LinkedList JavaDoc;
25 import java.util.Set JavaDoc;
26 import java.util.zip.GZIPInputStream JavaDoc;
27 import java.util.zip.GZIPOutputStream JavaDoc;
28 import org.ozoneDB.Setup;
29 import org.ozoneDB.DxLib.DxHashMap;
30 import org.ozoneDB.DxLib.DxHashSet;
31 import org.ozoneDB.DxLib.DxIterator;
32 import org.ozoneDB.DxLib.DxMap;
33 import org.ozoneDB.core.Env;
34 import org.ozoneDB.core.Lock;
35 import org.ozoneDB.core.MROWLock;
36 import org.ozoneDB.core.Permissions;
37 import org.ozoneDB.core.Transaction;
38 import org.ozoneDB.core.TransactionID;
39 import org.ozoneDB.core.storage.AbstractClusterStore;
40 import org.ozoneDB.core.storage.Cache;
41 import org.ozoneDB.core.storage.Cluster;
42 import org.ozoneDB.core.storage.ClusterID;
43 import org.ozoneDB.core.storage.StorageObjectContainer;
44 import org.ozoneDB.core.storage.SoftReferenceCache;
45 import org.ozoneDB.io.stream.ResolvingObjectInputStream;
46 import org.ozoneDB.util.LogWriter;
47
48
49 /**
50  * The ClusterStore is the back-end store of the magicStore. It maintains the
51  * cluster cache, activation/passivation and the actual persistent commits.
52  *
53  *
54  * @author <a HREF="http://www.softwarebuero.de/">SMB</a>
55  * @author <a HREF="http://www.medium.net/">Medium.net</a>
56  * @author Leo Mekenkamp
57  * @author Per Nyfelt
58  * @version $Revision: 1.11 $Date: 2004/03/10 14:59:48 $
59  */

60 public final class ClusterStore extends AbstractClusterStore {
61
62     // note: change splitClusterName() when you change POSTFIX_SEPARATOR
63
final static String JavaDoc POSTFIX_NEW = POSTFIX_SEPARATOR + "new";
64     private final static String JavaDoc POSTFIX_OLD = POSTFIX_SEPARATOR + "old";
65
66     protected final static int compressionFactor = 3;
67
68 // protected DxMap cachedClusters;
69
private transient Cache clusterCache;
70
71     protected int maxClusterSize = 64 * 1024;
72
73     /**
74      * Table that maps Permissions to ClusterIDs.
75      */

76     protected DxMap growingClusterIDs;
77
78     private boolean compressClusters;
79
80     private MagicStore magicStore;
81
82
83     ClusterStore(Env _env) {
84         super(_env);
85         maxClusterSize = env.config.intProperty(Setup.WS_CLUSTER_SIZE, -1);
86
87 // // TODO: pass properties from config to ctor
88
clusterCache = new SoftReferenceCache();
89
90         compressClusters = env.config.booleanProperty(Setup.WS_COMPRESS_CLUSTERS, true);
91     }
92
93
94     MagicStore getMagicStore() {
95         return magicStore;
96     }
97
98     void setMagicStore(MagicStore _magicStore) {
99         this.magicStore = _magicStore;
100     }
101
102     public void startup() throws Exception JavaDoc {
103         growingClusterIDs = new DxHashMap(32);
104     }
105
106
107     public void shutdown() {
108     }
109
110
111     /**
112      * Check if the ClusterStore was cleanly shutted down.
113      */

114     public boolean isCleanShutdown() {
115         File JavaDoc file = new File JavaDoc(env.getDatabaseDir() + Env.DATA_DIR);
116         String JavaDoc[] fileList = file.list();
117
118         for (int i = 0; i < fileList.length; i++) {
119             if (fileList[i].endsWith(POSTFIX_NEW) || fileList[i].endsWith(POSTFIX_OLD)) {
120                 return false;
121             }
122         }
123         return true;
124     }
125
126
127     /**
128      * Search the DATA dir and recover all ClusterIDs.
129      */

130     public Set JavaDoc recoverClusterIDs() {
131         File JavaDoc file = new File JavaDoc(env.getDatabaseDir() + Env.DATA_DIR);
132         String JavaDoc[] fileList = file.list();
133
134         Set JavaDoc result = new HashSet JavaDoc();
135         for (int i = 0; i < fileList.length; i++) {
136             if (fileList[i].endsWith(POSTFIX_CLUSTER) || fileList[i].endsWith(POSTFIX_NEW)|| fileList[i].endsWith(POSTFIX_OLD)) {
137                 String JavaDoc cidString = fileList[i].substring(0, fileList[i].indexOf(POSTFIX_SEPARATOR));
138                 long cid = Long.parseLong(cidString);
139                 result.add(new ClusterID(cid));
140             }
141         }
142         return result;
143     }
144
145     /**
146      * Returns a set containing the IDs of all tx-es that for one reason or
147      * another have not committed.
148      */

149     Set JavaDoc uncommittedTaIDs() {
150         File JavaDoc file = new File JavaDoc(env.getDatabaseDir() + Env.DATA_DIR);
151         File JavaDoc[] fileList = file.listFiles(new FilenameFilter JavaDoc() {
152             public boolean accept(File JavaDoc dir, String JavaDoc name) {
153                 return name.endsWith(POSTFIX_NEW);
154             }
155         });
156         Set JavaDoc result = new HashSet JavaDoc();
157         for (int i = 0; i < fileList.length; i++) {
158             String JavaDoc[] parts = splitClusterName(fileList[i].getName());
159             long taID = Long.parseLong(parts[parts.length - 2]);
160             result.add(new TransactionID(taID));
161         }
162         return result;
163     }
164
165
166     private static String JavaDoc[] splitClusterName(String JavaDoc clusterName) {
167         // need to escape the regexp escape char; need escape char
168
// because '.' is an regexp special char
169
return clusterName.split("\\" + POSTFIX_SEPARATOR);
170     }
171
172     public long currentCacheSize() {
173         return clusterCache.size();
174     }
175
176
177     public int currentBytesPerContainer() {
178         int result = env.config.intProperty(Setup.WS_CLUSTER_SIZE_RATIO, 256);
179 // env.logWriter.newEntry( this, "currentBytesPerContainer(): setup:" + result, LogWriter.DEBUG );
180
return result;
181     }
182
183     /**
184      * @param perms Permissions of the cluster to search.
185      * @return MagicCluster with the specified permissions that is good to store a
186      * new container in it.
187      */

188     protected synchronized Cluster growingCluster(Permissions perms, MagicTransaction ta) throws Exception JavaDoc {
189         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
190             env.logWriter.newEntry(this, "growingCluster() ", LogWriter.DEBUG3);
191         }
192
193         Cluster cluster = null;
194         ClusterID cid = (ClusterID) growingClusterIDs.elementForKey(perms);
195
196         // load the current growing cluster and check space
197
if (cid != null) {
198             cluster = (Cluster) clusterCache.get(cid);
199             if (cluster == null) {
200                 cluster = loadCluster(cid, ta);
201             }
202             // check cluster size and if it was deactivated by the trimCache();
203
// use this cluster only if it isn't used by another ta
204
if (cluster.lock() == null || cluster.size() >= maxClusterSize ||
205                     cluster.lock().level(null) > Lock.LEVEL_NONE && !cluster.lock().isAcquiredBy(env.transactionManager.currentTA())) {
206
207                 if (env.logWriter.hasTarget(LogWriter.DEBUG1)) {
208                     env.logWriter.newEntry(this,
209                             "growingCluster(): growing cluster not usable: cid=" + cluster.clusterID() + " size=" + cluster.size() + " lockLevel=" +
210                             (cluster.lock() != null ? String.valueOf(cluster.lock().level(null)) : "null"),
211                             LogWriter.DEBUG1);
212                 }
213
214                 growingClusterIDs.removeForKey(perms);
215                 cluster = null;
216             }
217         }
218
219         // search all currently loaded clusters
220
if (cluster == null) {
221             for (Iterator JavaDoc i = clusterCache.copyToMap().values().iterator(); i.hasNext(); ) {
222                 Cluster cursor = (Cluster) i.next();
223
224                 // System.out.println (cursor.size());
225
if (cursor.size() < maxClusterSize && cursor.permissions().equals(perms)) {
226                     cluster = cursor;
227
228                     // check if the cluster deactivated be the ensureCacheSpace
229
if (cluster.lock() == null) {
230                         env.logWriter.newEntry(this,
231                                 "growingCluster(): loaded cluster was deactivated: " + cluster.clusterID(),
232                                 LogWriter.DEBUG);
233                         cluster = null;
234                     } else if (cluster.lock().level(null) > Lock.LEVEL_NONE && !cluster.lock().isAcquiredBy(
235                             env.transactionManager.currentTA())) {
236                         // use this cluster only if it isn't used by another ta
237
if (env.logWriter.hasTarget(LogWriter.DEBUG1)) {
238                             env.logWriter.newEntry(this,
239                                     "growingCluster(): loaded cluster is locked by another transaction: "
240                                     + cluster.clusterID(), LogWriter.DEBUG1);
241                         }
242                         cluster = null;
243                     } else {
244                         growingClusterIDs.addForKey(cluster.clusterID(), perms);
245                         if (env.logWriter.hasTarget(LogWriter.DEBUG1)) {
246                             env.logWriter.newEntry(this,
247                                     "growingCluster(): loaded cluster is now growing cluster: " + cluster.clusterID()
248                                     + " size:" + cluster.size(), LogWriter.DEBUG1);
249                         }
250                         break;
251                     }
252                 }
253             }
254         }
255
256         // write a new, empty cluster and load it just after to ensures
257
// that new cluster is "regularly" loaded
258
if (cluster == null) {
259             cluster = createANewEmptyAndUsableCluster(perms);
260         }
261
262         return cluster;
263     }
264
265     /**
266      Creates a cluster which is
267      <UL>
268      <LI>new</LI>
269      <LI>empty</LI>
270      <LI>usable and</LI>
271      <LI>not locked</LI>
272      </UL>
273      */

274     protected synchronized Cluster createANewEmptyAndUsableCluster(Permissions perms) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
275 // env.logWriter.newEntry( this, "growingCluster(): creating new cluster...", LogWriter.DEBUG );
276
Cluster cluster = new MagicCluster(new ClusterID(env.keyGenerator.nextID()), perms, (MROWLock) env.transactionManager.newLock(), 256);
277
278         activateCluster(cluster, 100);
279         clusterCache.put(cluster.clusterID(), cluster);
280
281         growingClusterIDs.addForKey(cluster.clusterID(), perms);
282 // env.logWriter.newEntry( this, "growingCluster(): new cluster created: " + cluster.clusterID(), LogWriter.DEBUG );
283

284         return cluster;
285     }
286
287     /**
288      Returns or creates a cluster which is not locked so that locking it will succeed.
289      The returned cluster is only guaranteed to be not locked by any other thread as long as this
290      method is called during synchronization to this ClusterStore.
291      */

292     protected Cluster giveMeAnUnlockedCluster(Permissions perms) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
293         return createANewEmptyAndUsableCluster(perms);
294     }
295
296     /**
297      Associates the specified container with a cluster.
298
299      Iff this method returns normally (without exception), the container (and thus the cluster of the container)
300      is write locked
301
302      @param container Container to be registered with one cluster.
303      */

304     public void registerContainerAndLock(StorageObjectContainer container, Permissions perms, Transaction locker, int lockLevel) throws Exception JavaDoc {
305         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
306             env.logWriter.newEntry(this, "registerContainer()", LogWriter.DEBUG3);
307         }
308
309         Cluster cluster = null;
310
311         boolean locked = false;
312         boolean alright = false;
313
314         try {
315             synchronized (this) {
316                 MagicTransaction ta = (MagicTransaction) env.transactionManager.currentTA();
317                 cluster = growingCluster(perms, ta);
318
319                 Lock clusterLock = cluster.lock();
320                 int prevLevel = clusterLock.tryAcquire(locker, lockLevel);
321
322                 if (prevLevel == Lock.NOT_ACQUIRED) { // The cluster we are trying to lock is already locked, so we take another cluster
323
cluster = giveMeAnUnlockedCluster(perms);
324
325                     clusterLock = cluster.lock();
326                     prevLevel = clusterLock.tryAcquire(locker, lockLevel);
327
328                     if (prevLevel == Lock.NOT_ACQUIRED) {
329                         throw new Error JavaDoc("BUG! We could not acquire a lock for an unlocked cluster.");
330                     }
331                 }
332                 locked = true;
333
334                 cluster.registerContainer(container);
335             }
336             cluster.updateLockLevel(locker);
337
338             if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
339                 env.logWriter.newEntry(this, " cluster: " + cluster.clusterID(), LogWriter.DEBUG3);
340             }
341             alright = true;
342         } finally {
343             if (!alright) {
344                 if (locked) {
345                     cluster.lock().release(locker);
346                 }
347             }
348         }
349     }
350
351
352     public void invalidateContainer(StorageObjectContainer container) {
353         synchronized (container) {
354             container.getCluster().removeContainer(container);
355             container.setCluster(null);
356         }
357     }
358
359
360     protected Cluster restoreCluster(final ClusterID cid, Set JavaDoc uncommittedTaIDs) throws Exception JavaDoc {
361         String JavaDoc basename = basename(cid);
362         Cluster cluster;
363
364         new File JavaDoc(basename + POSTFIX_LOCK).delete();
365
366         File JavaDoc dir = new File JavaDoc(env.getDatabaseDir() + Env.DATA_DIR);
367         File JavaDoc[] oldFileList = dir.listFiles(new FilenameFilter JavaDoc() {
368             public boolean accept(File JavaDoc dir, String JavaDoc name) {
369                 return name.startsWith(cid.value() + POSTFIX_SEPARATOR) && name.endsWith(POSTFIX_OLD);
370             }
371         });
372
373         File JavaDoc[] newFileList = dir.listFiles(new FilenameFilter JavaDoc() {
374             public boolean accept(File JavaDoc dir, String JavaDoc name) {
375                 return name.startsWith(cid.value() + POSTFIX_SEPARATOR) && name.endsWith(POSTFIX_OLD);
376             }
377         });
378
379         File JavaDoc clusterFile = new File JavaDoc(basename + POSTFIX_CLUSTER);
380
381         if (oldFileList.length == 0) {
382             if (newFileList.length == 1) {
383
384                 // there is only a new file and no old file, so the new file
385
// is simply uncommitted data and we delete it
386
newFileList[0].delete();
387             }
388         } else if (oldFileList.length == 1) {
389             long num = Long.parseLong(splitClusterName(oldFileList[0].getName())[1]);
390             TransactionID taID = new TransactionID(num);
391             if (uncommittedTaIDs.contains(taID)) {
392                 if (newFileList.length == 1) {
393
394                     // we have both an old and a new file, but the tx that has
395
// written these files has not fully committed, so we can
396
// delete the new file here and rename the old file later
397
newFileList[0].delete();
398                 } else {
399
400                     // there is no new file but we do have an old file and the
401
// tx has not fully committed but has been able to rename
402
// new into cluster, so we delete the cluster and rename the
403
// old file later
404
clusterFile.delete();
405                 }
406                 if (!oldFileList[0].renameTo(clusterFile)) {
407                     throw new IOException JavaDoc("Unable to rename old cluster file " + oldFileList[0] + " to " + clusterFile);
408                 }
409
410             } else {
411
412                 // we need not bother checking newFileList at this point, since
413
// uncommittedTaIDs.contains(taID) can only be false when there
414
// is no new cluster whatsoever (not only the cluster we
415
// currently look at) on disk with the taID in its filename
416
if (oldFileList.length == 1) {
417
418                     // this might happen, since a tx that has renamed all new
419
// files to cluster files can be considered to have
420
// committed; note that a client is only notified of a
421
// commit when all old files have been deleted as well ->
422
// this is an evil that may exist, since a crash can happen
423
// at any time (also between finishing the commit and
424
// informing any client the commit has taken place)
425
oldFileList[0].delete();
426                 }
427
428             }
429         }
430
431         cluster = (Cluster) loadData(basename + POSTFIX_CLUSTER);
432         activateCluster(cluster, 0);
433
434         return cluster;
435     }
436
437
438     /**
439      * Make sure the corresponding cluster is in the cache. While loading
440      * clusters, we may have to throw away (and maybe store) some currently
441      * cached clusters.
442      *
443      *
444      * @param cid ClusterID of the cluster to load.
445      */

446 // public Cluster loadCluster(ClusterID cid) throws IOException, ClassNotFoundException {
447
// return loadCluster(cid, null);
448
// }
449

450     public Cluster loadCluster(ClusterID cid, MagicTransaction ta) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
451         Cluster cluster = (Cluster) clusterCache.get(cid);
452         if (cluster == null) {
453             if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
454                 env.logWriter.newEntry(this, "loadCluster(): load cluster from disk: " + cid.toString(), LogWriter.DEBUG);
455             }
456
457             final String JavaDoc basename = basename(cid);
458             String JavaDoc newClusterName = ta == null ? null : basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW;
459             String JavaDoc uncommittedClusterName = ta == null ? null : basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_CLUSTER;
460             String JavaDoc currentClusterName = basename + POSTFIX_CLUSTER;
461             String JavaDoc lockName = basename + POSTFIX_LOCK;
462
463             String JavaDoc clusterName = null;
464             
465             if (new File JavaDoc(uncommittedClusterName).exists()) {
466                 clusterName = uncommittedClusterName;
467             } else if (new File JavaDoc(lockName).exists()) {
468                 MROWLock lock = (MROWLock) loadData(lockName);
469                 TransactionID lockerID = lock.getWriteLockingTransactionID();
470                 clusterName = basename + POSTFIX_SEPARATOR + lockerID.value() + POSTFIX_CLUSTER;
471             } else if (new File JavaDoc(currentClusterName).exists()) {
472                 clusterName = currentClusterName;
473             } else if (new File JavaDoc(newClusterName).exists()) {
474                 clusterName = newClusterName;
475             }
476             cluster = (Cluster) loadData(clusterName);
477                 
478             int clusterByteSize = (int) new File JavaDoc(clusterName).length();
479             if (compressClusters) {
480                 clusterByteSize *= compressionFactor;
481             }
482
483             env.logWriter.newEntry(this, "loaded data = " + cluster.getClass().getName(), LogWriter.DEBUG);
484
485             synchronized (this) {
486
487                 // now we have to check the cachedClusters table inside the
488
// synchronized block to see if someone did register this
489
// cluster while we loaded it
490
Cluster interimCluster = (Cluster) clusterCache.get(cid);
491                 if (interimCluster != null) {
492                     env.logWriter.newEntry(this, "loadCluster(): cluster was loaded by another thread too; droping my copy", LogWriter.DEBUG);
493
494                     cluster = interimCluster;
495
496                 } else {
497                     // we are going to mess with the cluster; it seems that the cluster
498
// is not visible to other thread until it is added to cachedClusters,
499
// however, IBM jdk throws an exception in cluster.updateLockLevel, which
500
// seems to be related to the initialization in the following block
501
synchronized (cluster) {
502                         // locks are only there if the lock level is >= READ
503
File JavaDoc lockFile = new File JavaDoc(lockName);
504                         if (lockFile.exists()) {
505                             cluster.setLock((Lock) loadData(lockName));
506                             if (!lockFile.delete()) {
507                                 env.logWriter.newEntry(this, "could not delete lock file " + lockFile, LogWriter.ERROR);
508                             }
509                         } else {
510                             if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
511                                 env.logWriter.newEntry(this, "no lock on disk for " + cid + ", creating a new lock.", LogWriter.DEBUG3);
512                             }
513                             cluster.setLock(env.transactionManager.newLock());
514                         }
515                         ((MROWLock) cluster.lock()).setDebugInfo("clusterID=" + cluster.clusterID());
516
517
518                         activateCluster(cluster, clusterByteSize);
519                     }
520
521                     if (clusterByteSize > maxClusterSize * 2) {
522                         splitCluster(cluster);
523                     }
524
525                     clusterCache.put(cluster.clusterID(), cluster);
526                 }
527             }
528         }
529         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
530             env.logWriter.newEntry(this, "returning MagicCluster: " + cluster, LogWriter.DEBUG3);
531         }
532         return cluster;
533     }
534
535
536     public void splitCluster(Cluster cluster) {
537         // todo: implement?
538
}
539
540
541     /**
542      * Remove cluster from the cluster cache.
543      * @param cid
544      */

545     public void unloadCluster(ClusterID cid, boolean deactivate) throws IOException JavaDoc {
546         if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
547             env.logWriter.newEntry(this, "unloadCluster(" + cid + "," + deactivate + ").", LogWriter.DEBUG);
548         }
549         Cluster cluster = (Cluster) clusterCache.remove(cid);
550
551         if (deactivate) {
552             deactivateCluster(cluster);
553         }
554     }
555
556
557     /**
558      * This method is called right after the specified MagicCluster was loaded from
559      * disk.
560      */

561     protected void activateCluster(Cluster cluster, int size) {
562         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
563             env.logWriter.newEntry(this, "activateCluster(): " + cluster.clusterID(), LogWriter.DEBUG3);
564         }
565         cluster.setEnv(env);
566         cluster.setClusterStore(this);
567         cluster.touch();
568         cluster.setCurrentSize(size);
569     }
570
571
572     /**
573      * Deactivate the specified cluster before it is written to disk. The
574      * specified cluster will be removed from the cluster cache. If it currently
575      * has shadows, they are written to disk. If any of the containers are
576      * currently invoked (should normally never happen), the shadows must stay
577      * in memory.
578      */

579     protected void deactivateCluster(Cluster cluster) throws IOException JavaDoc {
580         if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
581             env.logWriter.newEntry(this,
582                     "deactivateCluster(): " + cluster.clusterID() + " priority: " + cluster.cachePriority(),
583                     LogWriter.DEBUG);
584             env.logWriter.newEntry(this, " lock: " + cluster.lock().level(null), LogWriter.DEBUG);
585         }
586
587         String JavaDoc basename = basename(cluster.clusterID());
588
589         synchronized (this) { // We synchronize on this ClusterStore so that a freshly returned cluster within the ClusterStore lock may not be deactivated during the lock time.
590
}
591     }
592
593
594     /**
595      * Store the specified cluster on disk. Write new files first. If this
596      * write fails, the original are still valid. The cluster may has been
597      * written to the disk already, if is was deactivated while transaction.
598      * But in case the cluster (and its changes) are only in memory, we have to
599      * write now to check if this is possible without errors.
600      *
601      * Note: This only writes all currently commited transaction results to the
602      * disk. This is different from the deactivation behaviour.
603      *
604      *
605      * @param cid MagicCluster to be prepare-commited.
606      * @exception IOException None of the clusters are written to disk.
607      */

608     public synchronized void prepareCommitCluster(Transaction ta, ClusterID cid) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
609         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
610             env.logWriter.newEntry(this, "prepareCommitCluster(): " + cid, LogWriter.DEBUG3);
611         }
612
613         Cluster cluster = loadCluster(cid, (MagicTransaction) ta);
614         cluster.prepareCommit(ta);
615
616         String JavaDoc basename = basename(cid);
617         File JavaDoc clusterFile = new File JavaDoc(basename + POSTFIX_CLUSTER);
618         File JavaDoc oldFile = new File JavaDoc(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD);
619
620         if (cluster.lock().level(null) >= Lock.LEVEL_WRITE) {
621             // need to check if the cluster file exists, it does not when the cluster
622
// is created during transaction ta
623
if (clusterFile.exists() && !clusterFile.renameTo(oldFile)) {
624                 throw new IOException JavaDoc("Unable to rename cluster file " + clusterFile + " to " + oldFile);
625             }
626             String JavaDoc tempFilename = basename(cid) + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW;
627             storeDataImmediately(cluster, tempFilename);
628         }
629     }
630
631
632     /**
633      * @param cid MagicCluster to be commited.
634      */

635     public synchronized void commitCluster(Transaction ta, ClusterID cid) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
636         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
637             env.logWriter.newEntry(this, "commitCluster(): " + cid, LogWriter.DEBUG3);
638         }
639
640         String JavaDoc basename = basename(cid);
641         File JavaDoc clusterFile = new File JavaDoc(basename + POSTFIX_CLUSTER);
642         File JavaDoc oldFile = new File JavaDoc(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD);
643         File JavaDoc newFile = new File JavaDoc(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW);
644         File JavaDoc uncommittedFile = new File JavaDoc(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_CLUSTER);
645
646         // newFile only exists if it has been written to disk prior to commit
647
if (newFile.exists() && !newFile.renameTo(clusterFile)) {
648             throw new IOException JavaDoc("Unable to rename cluster file " + newFile + " to " + clusterFile);
649         }
650
651         // oldFile does not exist if cluster was created during transaction ta
652
if (oldFile.exists() && !oldFile.delete()) {
653             throw new IOException JavaDoc("Unable to delete old cluster file " + oldFile);
654         }
655         Cluster cluster = loadCluster(cid, (MagicTransaction) ta);
656         cluster.commit(ta);
657
658         if (uncommittedFile.exists() && !uncommittedFile.delete()) {
659             throw new IOException JavaDoc("Unable to delete uncommitted cluster file " + uncommittedFile);
660         }
661
662         // after the cluster is commited its lock is released and has to be
663
// updated on disk; if no lock file exists, the lock is newly created
664
// when loading
665
updateLockOnDisk(cluster, ta);
666     }
667
668
669     /**
670      * Actually abort the specified cluster. This deletes t
671      * @param cid MagicCluster to be aborted.
672      */

673     public synchronized void abortCluster(Transaction ta, ClusterID cid) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
674         String JavaDoc basename = basename(cid);
675         File JavaDoc newFile = new File JavaDoc(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW);
676         File JavaDoc clusterFile = new File JavaDoc(basename + POSTFIX_CLUSTER);
677         File JavaDoc oldFile = new File JavaDoc(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD);
678         if (newFile.exists() && !newFile.delete()) {
679             throw new IOException JavaDoc("Unable to delete new cluster file " + newFile);
680         }
681         if (oldFile.exists()) {
682             if (clusterFile.exists() && !clusterFile.delete()) {
683                 throw new IOException JavaDoc("Unable to delete cluster file " + clusterFile);
684             }
685             if (!oldFile.renameTo(clusterFile)) {
686                 throw new IOException JavaDoc("Unable to rename old cluster file " + oldFile + " to " + clusterFile);
687             }
688         }
689
690         // FIXME: if transaction size exceeds cache size, this loads the
691
// cluster again altough it's not really needed
692
// MagicCluster cluster = loadCluster(cid,false);
693

694         Cluster cluster = loadCluster(cid, (MagicTransaction) ta);
695         cluster.abort(ta);
696
697         // the above abort() call does not change the cluster in memory, so
698
// we have to reload the cluster next time
699
unloadCluster(cid, false);
700
701         // after the cluster is aborted its lock is released and has to be
702
// updated on disk; if no lock file exists, the lock is newly created
703
// when loading
704
updateLockOnDisk(cluster, ta);
705
706     }
707
708
709     protected void updateLockOnDisk(Cluster cluster, Transaction ta) throws IOException JavaDoc {
710         // System.out.println ("commit " + cid + ": " + ((DefaultLock)cluster.lock).lockers.count());
711
ClusterID cid = cluster.clusterID();
712         if (cluster.lock().level(ta) == Lock.LEVEL_NONE) {
713             File JavaDoc lockFile = new File JavaDoc(basename(cid) + POSTFIX_LOCK);
714             if (lockFile.exists() && !lockFile.delete()) {
715                 throw new IOException JavaDoc("Unable to delete lock file.");
716             }
717         } else {
718             storeDataImmediately(cluster.lock(), basename(cid) + POSTFIX_LOCK);
719         }
720     }
721
722
723     /*
724      * Since we dropped the whole pinning idea we can do all writing in a
725      * separate thread, as long as there are hard references to a cluster, that
726      * cluster can be found in the clusterCache.
727      */

728     private class StoreThread extends Thread JavaDoc {
729
730         private class Entry {
731
732             public String JavaDoc key;
733             public Object JavaDoc value;
734
735             public Entry(String JavaDoc key, Object JavaDoc value) {
736                 this.value = value;
737                 this.key = key;
738             }
739         }
740
741         private LinkedList JavaDoc storeList = new LinkedList JavaDoc();
742         private volatile boolean stopRunning;
743
744         public void stopRunning() {
745             stopRunning = true;
746             synchronized (storeList) {
747                 storeList.notifyAll();
748             }
749         }
750
751         public void storeData(Object JavaDoc obj, String JavaDoc key) {
752             if (stopRunning) {
753                 // TODO: replace with proper exception
754
throw new RuntimeException JavaDoc("cannot call storeData() after stopRunning()");
755             }
756
757             synchronized (storeList) {
758                 // It might happen that a tx is creating objects like crazy,
759
// thereby creating clusters like crazy. When there are more
760
// clusters being created than written, we run out of memory. So
761
// we need to keep the number of clusters that is to be written
762
// to disk not too high
763
// TODO: make some parameter out of this
764
while (storeList.size() >= 10) {
765                     try {
766                         storeList.wait();
767                     } catch (InterruptedException JavaDoc ignore) {
768                     }
769                 }
770                 storeList.addLast(new Entry(key, obj));
771                 storeList.notifyAll();
772             }
773         }
774
775         public void run() {
776             for (Entry entry = null; !stopRunning && entry == null; ) {
777                 synchronized (storeList) {
778                     if (storeList.size() > 0) {
779                         entry = (Entry) storeList.removeFirst();
780                         storeList.notifyAll();
781                     } else {
782                         try {
783                             storeList.wait();
784                         } catch (InterruptedException JavaDoc ignore) {
785                         }
786                     }
787                 }
788                 if (entry != null) {
789                     try {
790                         storeDataImmediately(entry.value, entry.key);
791                     } catch (IOException JavaDoc e) {
792                         env.logWriter.newEntry(this, "could not write: " + entry.value + ", filename: " + entry.key, e, LogWriter.ERROR);
793                     }
794                 }
795             }
796         }
797     }
798
799     private StoreThread storeThread = new StoreThread();
800
801     /**
802      * Serialize and store the specified object for the specified key. This
803      * current implementation uses the file system as back end store.
804      */

805     protected void storeData(Object JavaDoc obj, String JavaDoc key) throws IOException JavaDoc {
806         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
807             env.logWriter.newEntry(this, "storeData(): " + key, LogWriter.DEBUG3);
808         }
809
810         storeThread.storeData(obj, key);
811     }
812
813     protected void storeDataImmediately(Object JavaDoc obj, String JavaDoc key) throws IOException JavaDoc {
814         OutputStream JavaDoc out = new FileOutputStream JavaDoc(key);
815
816         if (compressClusters) {
817             out = new GZIPOutputStream JavaDoc(out, 3 * 4096);
818         } else {
819             out = new BufferedOutputStream JavaDoc(out, 3 * 4096);
820         }
821
822         ObjectOutputStream JavaDoc oout = new ObjectOutputStream JavaDoc(out);
823         try {
824             oout.writeObject(obj);
825         } finally {
826             oout.close();
827         }
828     }
829
830     /**
831      * Load the data that previously has been stored for the given key.
832      */

833     protected Object JavaDoc loadData(String JavaDoc key) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
834         if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
835             env.logWriter.newEntry(this, "loadData(): " + key, LogWriter.DEBUG3);
836         }
837
838         InputStream JavaDoc in = new FileInputStream JavaDoc(key);
839
840         if (compressClusters) {
841             in = new GZIPInputStream JavaDoc(in, 3 * 4096);
842         } else {
843             in = new BufferedInputStream JavaDoc(in, 3 * 4096);
844         }
845
846         ObjectInputStream JavaDoc oin = new ResolvingObjectInputStream(in);
847         try {
848             Object JavaDoc result = oin.readObject();
849             return result;
850         } finally {
851             oin.close();
852         }
853     }
854
855
856     void abortTransaction(MagicTransaction ta) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
857         ta.commitClusterIDs = new DxHashSet(64);
858
859         DxIterator it = ta.idTable.iterator();
860         ClusterID cid;
861         while ((cid = (ClusterID) it.next()) != null) {
862             if (!ta.commitClusterIDs.contains(cid)) {
863                 // We MUST NOT abort read locked clusters (because they may be read locked from other transactions, too)
864

865                 Cluster cluster = loadCluster(cid, ta);
866
867                 if (cluster.lock().level(ta) > Lock.LEVEL_READ) {
868                     if (env.logWriter.hasTarget(LogWriter.DEBUG2)) {
869                         env.logWriter.newEntry(this, "abort cluster: " + cid, LogWriter.DEBUG2);
870                     }
871
872                     abortCluster(ta, cid);
873                 } else {
874                     // Do a plain unlock, and we are fine.
875
cluster.lock().release(ta);
876                 }
877                 ta.commitClusterIDs.add(cid);
878             }
879         }
880     }
881 }
882
Popular Tags