1 8 package org.ozoneDB.core.storage.gammaStore; 9 10 import java.io.ByteArrayOutputStream ; 11 import java.io.IOException ; 12 import java.io.ObjectOutputStream ; 13 import java.util.Collections ; 14 import java.util.HashSet ; 15 import java.util.Iterator ; 16 import java.util.LinkedHashMap ; 17 import java.util.LinkedHashSet ; 18 import java.util.Map ; 19 import java.util.Set ; 20 import java.util.Map.Entry; 21 import java.util.logging.Level ; 22 import java.util.logging.Logger ; 23 import org.ozoneDB.OzoneInternalException; 24 25 37 public class Serializer { 38 39 private class SerializeTask implements Runnable { 40 41 private int pass = 0; 42 43 private Object id; 44 private Storable storable; 45 byte[] image; 46 47 SerializeTask(Object id, Storable storable) { 48 this.id = id; 49 this.storable = storable; 50 } 51 52 Storable getStorable() { 53 return storable; 54 } 55 56 public String toString() { 57 return "id: " + id + ", pass: " + pass; 58 } 59 60 public void run() { 61 if (log.isLoggable(Level.FINE)) { 62 log.fine("serializeTask for " + storable.getStorageName() + ", pass " + pass); 63 } 64 switch (pass) { 65 case 0: 66 67 try { 68 ByteArrayOutputStream byteStream = new ByteArrayOutputStream (); 69 ObjectOutputStream objectStream; 70 if (streamFactory == null) { 71 objectStream = new ObjectOutputStream (byteStream); 72 } else { 73 objectStream = new ObjectOutputStream (streamFactory.createOutputStream(byteStream)); 74 } 75 objectStream.writeObject(storable); 76 objectStream.close(); 77 image = byteStream.toByteArray(); 78 79 pass++; 80 81 synchronized (storeAsyncExec) { 82 while (storeAsyncExec.size() > 1) { 83 try { 84 storeAsyncExec.wait(); 85 } catch (InterruptedException ignore) { 86 } 87 } 88 if (log.isLoggable(Level.FINE)) { 89 log.fine("finishing serializeTask for " + storable.getStorageName() + ", pass " + (pass - 1)); 90 } 91 storeAsyncExec.put(id, this); 99 } 100 } catch (IOException e) { 101 if (log.isLoggable(Level.FINE)) { 102 log.log(Level.FINE, "could not serialize, id: " + id + "; storable: " + storable, e); 103 } 104 throw new OzoneInternalException(e); 105 } 106 107 break; 108 109 case 1: 110 111 try { 112 Storage storage = storageFactory.createStorage(storable.getStorageName()); 113 storage.write(image); 114 storage.close(); 115 } catch (IOException e) { 116 log.log(Level.SEVERE, "could not write " + storable.getStorageName(), e); 117 throw new OzoneInternalException(e); 118 } finally { 119 synchronized (storeAsyncExec) { 120 storeAsyncExec.notifyAll(); 121 } 122 } 123 124 if (log.isLoggable(Level.FINE)) { 125 log.fine("finished serializeTask for " + storable.getStorageName() + ", pass " + pass); 126 } 127 pass++; 128 break; 129 default: 130 throw new OzoneInternalException("Pass " + pass + "? Nobody ever informs me about these things."); 131 } 132 } 133 134 } 135 136 137 private static final Logger log = Logger.getLogger(Serializer.class.getName()); 138 139 private AsyncExec serializeAsyncExec; 140 private AsyncExec storeAsyncExec; 141 142 private StorageFactory storageFactory; 143 private StreamFactory streamFactory; 144 145 public Serializer(StorageFactory storageFactory, StreamFactory streamFactory, String name) { 146 this.storageFactory = storageFactory; 147 this.streamFactory = streamFactory; 148 serializeAsyncExec = new AsyncExec(name + " serialize", Thread.MAX_PRIORITY, true); 149 storeAsyncExec = new AsyncExec(name + " store", Thread.MAX_PRIORITY, true); 150 } 151 152 157 public void put(Object id, Storable storable) { 158 if (log.isLoggable(Level.FINER)) log.finer("enqueueing " + storable + " under id " + id); 159 serializeAsyncExec.put(id, new SerializeTask(id, storable)); 160 } 161 162 167 public Storable remove(Object id) { 168 169 SerializeTask taskA = (SerializeTask) serializeAsyncExec.remove(id); 174 SerializeTask taskB = (SerializeTask) storeAsyncExec.remove(id); 175 176 if (taskA == null) { 177 taskA = taskB; 178 } 179 return taskA == null ? null : taskA.getStorable(); 180 } 181 182 public int size() { 183 return serializeAsyncExec.size() + storeAsyncExec.size(); 184 } 185 186 public void stopWhenReady() { 187 serializeAsyncExec.stopWhenReady(); 188 storeAsyncExec.stopWhenReady(); 189 } 190 191 } 192 | Popular Tags |