1 5 package com.tc.objectserver.persistence.sleepycat; 6 7 import com.sleepycat.bind.serial.ClassCatalog; 8 import com.sleepycat.je.Cursor; 9 import com.sleepycat.je.CursorConfig; 10 import com.sleepycat.je.Database; 11 import com.sleepycat.je.DatabaseEntry; 12 import com.sleepycat.je.DatabaseException; 13 import com.sleepycat.je.LockMode; 14 import com.sleepycat.je.OperationStatus; 15 import com.tc.logging.TCLogger; 16 import com.tc.object.ObjectID; 17 import com.tc.objectserver.core.api.ManagedObject; 18 import com.tc.objectserver.core.api.ManagedObjectState; 19 import com.tc.objectserver.managedobject.MapManagedObjectState; 20 import com.tc.objectserver.persistence.api.ManagedObjectPersistor; 21 import com.tc.objectserver.persistence.api.PersistenceTransaction; 22 import com.tc.objectserver.persistence.api.PersistenceTransactionProvider; 23 import com.tc.objectserver.persistence.api.PersistentSequence; 24 import com.tc.objectserver.persistence.sleepycat.SleepycatPersistor.SleepycatPersistorBase; 25 import com.tc.text.PrettyPrinter; 26 import com.tc.util.Assert; 27 import com.tc.util.Conversion; 28 import com.tc.util.ObjectIDSet2; 29 import com.tc.util.SyncObjectIdSet; 30 import com.tc.util.SyncObjectIdSetImpl; 31 32 import java.io.IOException ; 33 import java.util.Collection ; 34 import java.util.HashMap ; 35 import java.util.HashSet ; 36 import java.util.Iterator ; 37 import java.util.Map ; 38 import java.util.Set ; 39 40 public final class ManagedObjectPersistorImpl extends SleepycatPersistorBase implements ManagedObjectPersistor { 41 private final Database objectDB; 42 private final SerializationAdapterFactory saf; 43 private final CursorConfig objectDBCursorConfig; 44 private final PersistentSequence objectIDSequence; 45 private final Database rootDB; 46 private final CursorConfig rootDBCursorConfig; 47 private long saveCount; 48 private final TCLogger logger; 49 private final PersistenceTransactionProvider ptp; 50 private final ClassCatalog classCatalog; 51 SerializationAdapter serializationAdapter; 52 private final SleepycatCollectionsPersistor collectionsPersistor; 53 54 public ManagedObjectPersistorImpl(TCLogger logger, ClassCatalog classCatalog, 55 SerializationAdapterFactory serializationAdapterFactory, Database objectDB, 56 CursorConfig objectDBCursorConfig, PersistentSequence objectIDSequence, 57 Database rootDB, CursorConfig rootDBCursorConfig, 58 PersistenceTransactionProvider ptp, 59 SleepycatCollectionsPersistor collectionsPersistor) { 60 this.logger = logger; 61 this.classCatalog = classCatalog; 62 this.saf = serializationAdapterFactory; 63 this.objectDB = objectDB; 64 this.objectDBCursorConfig = objectDBCursorConfig; 65 this.objectIDSequence = objectIDSequence; 66 this.rootDB = rootDB; 67 this.rootDBCursorConfig = rootDBCursorConfig; 68 this.ptp = ptp; 69 this.collectionsPersistor = collectionsPersistor; 70 } 71 72 public long nextObjectIDBatch(int batchSize) { 73 return objectIDSequence.nextBatch(batchSize); 74 } 75 76 public void setNextAvailableObjectID(long startID) { 77 objectIDSequence.setNext(startID); 78 } 79 80 public void addRoot(PersistenceTransaction tx, String name, ObjectID id) { 81 validateID(id); 82 OperationStatus status = null; 83 try { 84 DatabaseEntry key = new DatabaseEntry(); 85 DatabaseEntry value = new DatabaseEntry(); 86 setStringData(key, name); 87 setObjectIDData(value, id); 88 89 status = this.rootDB.put(pt2nt(tx), key, value); 90 } catch (Throwable t) { 91 throw new DBException(t); 92 } 93 if (!OperationStatus.SUCCESS.equals(status)) { throw new DBException("Unable to write root id: " + name + "=" + id 94 + "; status: " + status); } 95 } 96 97 public ObjectID loadRootID(String name) { 98 if (name == null) throw new AssertionError ("Attempt to retrieve a null root name"); 99 OperationStatus status = null; 100 try { 101 DatabaseEntry key = new DatabaseEntry(); 102 DatabaseEntry value = new DatabaseEntry(); 103 setStringData(key, name); 104 PersistenceTransaction tx = ptp.newTransaction(); 105 status = this.rootDB.get(pt2nt(tx), key, value, LockMode.DEFAULT); 106 tx.commit(); 107 if (OperationStatus.SUCCESS.equals(status)) { 108 ObjectID rv = getObjectIDData(value); 109 return rv; 110 } 111 } catch (Throwable t) { 112 throw new DBException(t); 113 } 114 if (OperationStatus.NOTFOUND.equals(status)) return ObjectID.NULL_ID; 115 else throw new DBException("Error retrieving root: " + name + "; status: " + status); 116 } 117 118 public Set loadRoots() { 119 Set rv = new HashSet (); 120 Cursor cursor = null; 121 try { 122 DatabaseEntry key = new DatabaseEntry(); 123 DatabaseEntry value = new DatabaseEntry(); 124 PersistenceTransaction tx = ptp.newTransaction(); 125 cursor = rootDB.openCursor(pt2nt(tx), rootDBCursorConfig); 126 while (OperationStatus.SUCCESS.equals(cursor.getNext(key, value, LockMode.DEFAULT))) { 127 rv.add(getObjectIDData(value)); 128 } 129 cursor.close(); 130 tx.commit(); 131 } catch (Throwable t) { 132 throw new DBException(t); 133 } 134 return rv; 135 } 136 137 public SyncObjectIdSet getAllObjectIDs() { 138 SyncObjectIdSet rv = new SyncObjectIdSetImpl(); 139 rv.startPopulating(); 140 Thread t = new Thread (new ObjectIdReader(rv), "ObjectIdReaderThread"); 141 t.setDaemon(true); 142 t.start(); 143 return rv; 144 } 145 146 public Set loadRootNames() { 147 Set rv = new HashSet (); 148 Cursor cursor = null; 149 try { 150 PersistenceTransaction tx = ptp.newTransaction(); 151 DatabaseEntry key = new DatabaseEntry(); 152 DatabaseEntry value = new DatabaseEntry(); 153 cursor = rootDB.openCursor(pt2nt(tx), rootDBCursorConfig); 154 while (OperationStatus.SUCCESS.equals(cursor.getNext(key, value, LockMode.DEFAULT))) { 155 rv.add(getStringData(key)); 156 } 157 cursor.close(); 158 tx.commit(); 159 } catch (Throwable t) { 160 throw new DBException(t); 161 } 162 return rv; 163 } 164 165 public Map loadRootNamesToIDs() { 166 Map rv = new HashMap (); 167 Cursor cursor = null; 168 try { 169 PersistenceTransaction tx = ptp.newTransaction(); 170 DatabaseEntry key = new DatabaseEntry(); 171 DatabaseEntry value = new DatabaseEntry(); 172 cursor = rootDB.openCursor(pt2nt(tx), rootDBCursorConfig); 173 while (OperationStatus.SUCCESS.equals(cursor.getNext(key, value, LockMode.DEFAULT))) { 174 rv.put(getStringData(key),getObjectIDData(value)); 175 } 176 cursor.close(); 177 tx.commit(); 178 } catch (Throwable t) { 179 throw new DBException(t); 180 } 181 return rv; 182 } 183 184 public ManagedObject loadObjectByID(ObjectID id) { 185 validateID(id); 186 OperationStatus status = null; 187 PersistenceTransaction tx = ptp.newTransaction(); 188 try { 189 DatabaseEntry key = new DatabaseEntry(); 190 DatabaseEntry value = new DatabaseEntry(); 191 setObjectIDData(key, id); 192 status = this.objectDB.get(pt2nt(tx), key, value, LockMode.DEFAULT); 193 if (OperationStatus.SUCCESS.equals(status)) { 194 ManagedObject mo = getManagedObjectData(value); 195 loadCollection(tx, mo); 196 tx.commit(); 197 return mo; 198 } 199 } catch (Throwable e) { 200 abortOnError(tx); 201 throw new DBException(e); 202 } 203 if (OperationStatus.NOTFOUND.equals(status)) return null; 204 else throw new DBException("Error retrieving object id: " + id + "; status: " + status); 205 } 206 207 private void loadCollection(PersistenceTransaction tx, ManagedObject mo) throws IOException , ClassNotFoundException , 208 DatabaseException { 209 ManagedObjectState state = mo.getManagedObjectState(); 210 if (state.getType() == ManagedObjectState.MAP_TYPE || state.getType() == ManagedObjectState.PARTIAL_MAP_TYPE) { 211 MapManagedObjectState mapState = (MapManagedObjectState) state; 212 Assert.assertNull(mapState.getMap()); 213 mapState.setMap(collectionsPersistor.loadMap(tx, mo.getID())); 214 } 215 } 216 217 public void saveObject(PersistenceTransaction persistenceTransaction, ManagedObject managedObject) { 218 Assert.assertNotNull(managedObject); 219 validateID(managedObject.getID()); 220 OperationStatus status = null; 221 try { 222 status = basicSaveObject(persistenceTransaction, managedObject); 223 } catch (DBException e) { 224 throw e; 225 } catch (Throwable t) { 226 throw new DBException("Trying to save object: " + managedObject, t); 227 } 228 229 if (!OperationStatus.SUCCESS.equals(status)) { throw new DBException("Unable to write ManagedObject: " 230 + managedObject + "; status: " + status); } 231 232 } 233 234 private OperationStatus basicSaveObject(PersistenceTransaction tx, ManagedObject managedObject) 235 throws DatabaseException, IOException { 236 if (!managedObject.isDirty()) return OperationStatus.SUCCESS; 237 OperationStatus status; 238 DatabaseEntry key = new DatabaseEntry(); 239 DatabaseEntry value = new DatabaseEntry(); 240 setObjectIDData(key, managedObject.getID()); 241 setManagedObjectData(value, managedObject); 242 status = this.objectDB.put(pt2nt(tx), key, value); 243 if (OperationStatus.SUCCESS.equals(status)) { 244 basicSaveCollection(tx, managedObject); 245 managedObject.setIsDirty(false); 246 saveCount++; 247 if (saveCount == 1 || saveCount % (100 * 1000) == 0) { 248 logger.debug("saveCount: " + saveCount); 249 } 250 } 251 return status; 252 } 253 254 private void basicSaveCollection(PersistenceTransaction tx, ManagedObject managedObject) throws IOException , 255 DatabaseException { 256 ManagedObjectState state = managedObject.getManagedObjectState(); 257 if (state.getType() == ManagedObjectState.MAP_TYPE || state.getType() == ManagedObjectState.PARTIAL_MAP_TYPE) { 258 MapManagedObjectState mapState = (MapManagedObjectState) state; 259 SleepycatPersistableMap map = (SleepycatPersistableMap) mapState.getMap(); 260 collectionsPersistor.saveMap(tx, map); 261 } 262 } 263 264 public void saveAllObjects(PersistenceTransaction persistenceTransaction, Collection managedObjects) { 265 long t0 = System.currentTimeMillis(); 266 if (managedObjects.isEmpty()) return; 267 Object failureContext = null; 268 try { 269 for (Iterator i = managedObjects.iterator(); i.hasNext();) { 270 final ManagedObject managedObject = (ManagedObject) i.next(); 271 272 final OperationStatus status = basicSaveObject(persistenceTransaction, managedObject); 273 274 if (!OperationStatus.SUCCESS.equals(status)) { 275 failureContext = new Object () { 276 public String toString() { 277 return "Unable to save ManagedObject: " + managedObject + "; status: " + status; 278 } 279 }; 280 break; 281 } 282 } 283 } catch (Throwable t) { 284 throw new DBException(t); 285 } 286 287 if (failureContext != null) throw new DBException(failureContext.toString()); 288 289 long delta = System.currentTimeMillis() - t0; 290 saveAllElapsed += delta; 291 saveAllCount++; 292 saveAllObjectCount += managedObjects.size(); 293 if (saveAllCount % (100 * 1000) == 0) { 294 double avg = ((double) saveAllObjectCount / (double) saveAllElapsed) * 1000; 295 logger.debug("save time: " + delta + ", " + managedObjects.size() + " objects; avg: " + avg + "/sec"); 296 } 297 } 298 299 private long saveAllCount = 0; 300 private long saveAllObjectCount = 0; 301 private long saveAllElapsed = 0; 302 303 private void deleteObjectByID(PersistenceTransaction tx, ObjectID id) { 304 validateID(id); 305 try { 306 DatabaseEntry key = new DatabaseEntry(); 307 setObjectIDData(key, id); 308 OperationStatus status = this.objectDB.delete(pt2nt(tx), key); 309 if (!(OperationStatus.NOTFOUND.equals(status) || OperationStatus.SUCCESS.equals(status))) { 310 throw new DBException("Unable to remove ManagedObject for object id: " + id + ", status: " + status); 312 } else { 313 collectionsPersistor.deleteCollection(tx, id); 314 } 315 } catch (DatabaseException t) { 316 throw new DBException(t); 317 } 318 } 319 320 public void deleteAllObjectsByID(PersistenceTransaction tx, Collection objectIDs) { 321 for (Iterator i = objectIDs.iterator(); i.hasNext();) { 322 deleteObjectByID(tx, (ObjectID) i.next()); 323 } 324 } 325 326 329 SerializationAdapter getSerializationAdapter() throws IOException { 330 if (serializationAdapter == null) serializationAdapter = saf.newAdapter(this.classCatalog); 332 return serializationAdapter; 333 } 334 335 338 339 private void validateID(ObjectID id) { 340 Assert.assertNotNull(id); 341 Assert.eval(!ObjectID.NULL_ID.equals(id)); 342 } 343 344 private void setObjectIDData(DatabaseEntry entry, ObjectID objectID) { 345 entry.setData(Conversion.long2Bytes(objectID.toLong())); 346 } 347 348 private void setStringData(DatabaseEntry entry, String string) throws IOException { 349 getSerializationAdapter().serializeString(entry, string); 350 } 351 352 private void setManagedObjectData(DatabaseEntry entry, ManagedObject mo) throws IOException { 353 getSerializationAdapter().serializeManagedObject(entry, mo); 354 } 355 356 private ObjectID getObjectIDData(DatabaseEntry entry) { 357 return new ObjectID(Conversion.bytes2Long(entry.getData())); 358 } 359 360 private String getStringData(DatabaseEntry entry) throws IOException , ClassNotFoundException { 361 return getSerializationAdapter().deserializeString(entry); 362 } 363 364 private ManagedObject getManagedObjectData(DatabaseEntry entry) throws IOException , ClassNotFoundException { 365 return getSerializationAdapter().deserializeManagedObject(entry); 366 } 367 368 public void prettyPrint(PrettyPrinter out) { 369 out.println(this.getClass().getName()); 370 out = out.duplicateAndIndent(); 371 out.println("db: " + objectDB); 372 } 373 374 class ObjectIdReader implements Runnable { 375 private final SyncObjectIdSet set; 376 377 public ObjectIdReader(SyncObjectIdSet set) { 378 this.set = set; 379 } 380 381 public void run() { 382 ObjectIDSet2 tmp = new ObjectIDSet2(); 383 PersistenceTransaction tx = null; 384 Cursor cursor = null; 385 try { 386 tx = ptp.newTransaction(); 387 cursor = objectDB.openCursor(pt2nt(tx), objectDBCursorConfig); 388 DatabaseEntry key = new DatabaseEntry(); 389 DatabaseEntry value = new DatabaseEntry(); 390 while (OperationStatus.SUCCESS.equals(cursor.getNext(key, value, LockMode.DEFAULT))) { 391 tmp.add(new ObjectID(Conversion.bytes2Long(key.getData()))); 392 } 393 } catch (Throwable t) { 394 logger.error("Error Reading Object IDs", t); 395 } finally { 396 safeClose(cursor); 397 safeCommit(tx); 398 set.stopPopulating(tmp); 399 tmp = null; 400 } 401 } 402 private void safeCommit(PersistenceTransaction tx) { 403 if (tx == null) return; 404 try { 405 tx.commit(); 406 } catch (Throwable t) { 407 logger.error("Error Committing Transaction", t); 408 } 409 } 410 private void safeClose(Cursor c) { 411 if (c == null)return; 412 413 try { 414 c.close(); 415 } catch (DatabaseException e) { 416 logger.error("Error closing cursor", e); 417 } 418 } 419 } 420 } 421 | Popular Tags |