KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > ozoneDB > core > storage > gammaStore > Serializer


1 // You can redistribute this software and/or modify it under the terms of
2
// the Ozone Core License version 1 published by ozone-db.org.
3
//
4
// Copyright (C) 2003-@year@, Leo Mekenkamp. All rights reserved.
5
//
6
// $Id: Serializer.java,v 1.3 2004/02/01 20:55:47 leomekenkamp Exp $
7

8 package org.ozoneDB.core.storage.gammaStore;
9
10 import java.io.ByteArrayOutputStream JavaDoc;
11 import java.io.IOException JavaDoc;
12 import java.io.ObjectOutputStream JavaDoc;
13 import java.util.Collections JavaDoc;
14 import java.util.HashSet JavaDoc;
15 import java.util.Iterator JavaDoc;
16 import java.util.LinkedHashMap JavaDoc;
17 import java.util.LinkedHashSet JavaDoc;
18 import java.util.Map JavaDoc;
19 import java.util.Set JavaDoc;
20 import java.util.Map.Entry;
21 import java.util.logging.Level JavaDoc;
22 import java.util.logging.Logger JavaDoc;
23 import org.ozoneDB.OzoneInternalException;
24
25 /**
26  * <p>Takes care of <code>Storable</code> implemeting instances, in that it
27  * has different threads for serializing and writing to <code>Storage</code>.
28  * <code>Remove()</code> ensures that if a null is
29  * returned that the <code>Storable</code> for the given key has been
30  * completely written to <code>Storage</code>.</p>
31  *
32  * <p>For every instance of this class 2 threads are created, so keep this in
33  * mind when creating instances; this might be a resource hog...</p>
34  *
35  * @author leo
36  */

37 public class Serializer {
38     
39     private class SerializeTask implements Runnable JavaDoc {
40     
41         private int pass = 0;
42         
43         private Object JavaDoc id;
44         private Storable storable;
45         byte[] image;
46         
47         SerializeTask(Object JavaDoc id, Storable storable) {
48             this.id = id;
49             this.storable = storable;
50         }
51         
52         Storable getStorable() {
53             return storable;
54         }
55         
56         public String JavaDoc toString() {
57             return "id: " + id + ", pass: " + pass;
58         }
59         
60         public void run() {
61             if (log.isLoggable(Level.FINE)) {
62                 log.fine("serializeTask for " + storable.getStorageName() + ", pass " + pass);
63             }
64             switch (pass) {
65             case 0:
66                 
67                 try {
68                     ByteArrayOutputStream JavaDoc byteStream = new ByteArrayOutputStream JavaDoc();
69                     ObjectOutputStream JavaDoc objectStream;
70                     if (streamFactory == null) {
71                         objectStream = new ObjectOutputStream JavaDoc(byteStream);
72                     } else {
73                         objectStream = new ObjectOutputStream JavaDoc(streamFactory.createOutputStream(byteStream));
74                     }
75                     objectStream.writeObject(storable);
76                     objectStream.close();
77                     image = byteStream.toByteArray();
78                     
79                     pass++;
80                     
81                     synchronized (storeAsyncExec) {
82                         while (storeAsyncExec.size() > 1) {
83                             try {
84                                 storeAsyncExec.wait();
85                             } catch (InterruptedException JavaDoc ignore) {
86                             }
87                         }
88                         if (log.isLoggable(Level.FINE)) {
89                             log.fine("finishing serializeTask for " + storable.getStorageName() + ", pass " + (pass - 1));
90                         }
91                         // Note that the next statement may cause run() to be called
92
// again immediately, so we should do all important stuff
93
// before the next call (including pass++).
94
// Because this instance is in the two AsyncExecs at the
95
// same time for a short time, we never run into
96
// problems with get() when we first ask the one and
97
// then the other AsyncExec.
98
storeAsyncExec.put(id, this);
99                     }
100                 } catch (IOException JavaDoc e) {
101                     if (log.isLoggable(Level.FINE)) {
102                         log.log(Level.FINE, "could not serialize, id: " + id + "; storable: " + storable, e);
103                     }
104                     throw new OzoneInternalException(e);
105                 }
106
107                 break;
108                 
109             case 1:
110             
111                 try {
112                     Storage storage = storageFactory.createStorage(storable.getStorageName());
113                     storage.write(image);
114                     storage.close();
115                 } catch (IOException JavaDoc e) {
116                     log.log(Level.SEVERE, "could not write " + storable.getStorageName(), e);
117                     throw new OzoneInternalException(e);
118                 } finally {
119                     synchronized (storeAsyncExec) {
120                         storeAsyncExec.notifyAll();
121                     }
122                 }
123                 
124                 if (log.isLoggable(Level.FINE)) {
125                     log.fine("finished serializeTask for " + storable.getStorageName() + ", pass " + pass);
126                 }
127                 pass++;
128                 break;
129             default:
130                 throw new OzoneInternalException("Pass " + pass + "? Nobody ever informs me about these things.");
131             }
132         }
133         
134     }
135     
136     
137     private static final Logger JavaDoc log = Logger.getLogger(Serializer.class.getName());
138     
139     private AsyncExec serializeAsyncExec;
140     private AsyncExec storeAsyncExec;
141     
142     private StorageFactory storageFactory;
143     private StreamFactory streamFactory;
144     
145     public Serializer(StorageFactory storageFactory, StreamFactory streamFactory, String JavaDoc name) {
146         this.storageFactory = storageFactory;
147         this.streamFactory = streamFactory;
148         serializeAsyncExec = new AsyncExec(name + " serialize", Thread.MAX_PRIORITY, true);
149         storeAsyncExec = new AsyncExec(name + " store", Thread.MAX_PRIORITY, true);
150     }
151     
152     /**
153      * Places a storable into a write queue. Works like 'Fire and forget':
154      * method returns fast, while other threads take care of serialization and
155      * the actual persisting (writing).
156      */

157     public void put(Object JavaDoc id, Storable storable) {
158         if (log.isLoggable(Level.FINER)) log.finer("enqueueing " + storable + " under id " + id);
159         serializeAsyncExec.put(id, new SerializeTask(id, storable));
160     }
161     
162     /**
163      * Removes a storable from the write queue. If it returns <code>null</code>
164      * then the storable has already been written to disk or has never been
165      * entered through a call to <code>put(Object, Storable)</code>.
166      */

167     public Storable remove(Object JavaDoc id) {
168         
169         // Must remove from both execs to be sure that all data has been
170
// written. If we only check the serializeAsyncExec and if it returns
171
// null then (and only then) check storeAsynchExec we can return before
172
// all data has been written
173
SerializeTask taskA = (SerializeTask) serializeAsyncExec.remove(id);
174         SerializeTask taskB = (SerializeTask) storeAsyncExec.remove(id);
175
176         if (taskA == null) {
177             taskA = taskB;
178         }
179         return taskA == null ? null : taskA.getStorable();
180     }
181     
182     public int size() {
183         return serializeAsyncExec.size() + storeAsyncExec.size();
184     }
185     
186     public void stopWhenReady() {
187         serializeAsyncExec.stopWhenReady();
188         storeAsyncExec.stopWhenReady();
189     }
190     
191 }
192
Popular Tags