1 package je.txn; 2 3 import java.util.Random ; 4 5 import com.sleepycat.bind.EntryBinding; 6 import com.sleepycat.bind.serial.SerialBinding; 7 import com.sleepycat.bind.serial.StoredClassCatalog; 8 import com.sleepycat.bind.tuple.StringBinding; 9 import com.sleepycat.je.Cursor; 10 import com.sleepycat.je.CursorConfig; 11 import com.sleepycat.je.Database; 12 import com.sleepycat.je.DatabaseEntry; 13 import com.sleepycat.je.DatabaseException; 14 import com.sleepycat.je.DeadlockException; 15 import com.sleepycat.je.Environment; 16 import com.sleepycat.je.LockMode; 17 import com.sleepycat.je.OperationStatus; 18 import com.sleepycat.je.Transaction; 19 20 public class DBWriter extends Thread 21 { 22 private Database myDb = null; 23 private Environment myEnv = null; 24 private EntryBinding dataBinding = null; 25 private Random generator = new Random (); 26 27 private static int MAX_RETRY = 20; 28 29 private static String [] keys = {"key 1", "key 2", "key 3", 30 "key 4", "key 5", "key 6", 31 "key 7", "key 8", "key 9", 32 "key 10"}; 33 34 35 DBWriter(Environment env, Database db, StoredClassCatalog scc) 37 throws DatabaseException { 38 myDb = db; 39 myEnv = env; 40 dataBinding = new SerialBinding(scc, PayloadData.class); 41 } 42 43 44 public void run () { 48 Transaction txn = null; 49 50 for (int i=0; i<50; i++) { 52 53 boolean retry = true; 54 int retry_count = 0; 55 while (retry) { 57 try { 60 61 txn = myEnv.beginTransaction(null, null); 63 64 for (int j = 0; j < 10; j++) { 67 DatabaseEntry key = new DatabaseEntry(); 69 StringBinding.stringToEntry(keys[j], key); 70 71 PayloadData pd = new PayloadData(i+j, getName(), 73 generator.nextDouble()); 74 DatabaseEntry data = new DatabaseEntry(); 75 dataBinding.objectToEntry(pd, data); 76 77 myDb.put(txn, key, data); 79 } 80 81 System.out.println(getName() + " : committing txn : " + i); 83 84 System.out.println(getName() + " : Found " + 85 countRecords(null) + " records in the database."); 86 try { 87 txn.commit(); 88 txn = null; 89 } catch (DatabaseException e) { 90 System.err.println("Error on txn commit: " + 91 e.toString()); 92 } 93 retry = false; 94 95 } catch (DeadlockException de) { 96 System.out.println("################# " + getName() + 97 " : caught deadlock"); 98 if (retry_count < MAX_RETRY) { 100 System.err.println(getName() + 101 " : Retrying operation."); 102 retry = true; 103 retry_count++; 104 } else { 105 System.err.println(getName() + 106 " : out of retries. Giving up."); 107 retry = false; 108 } 109 } catch (DatabaseException e) { 110 retry = false; 112 System.err.println(getName() + 113 " : caught exception: " + e.toString()); 114 e.printStackTrace(); 115 } finally { 116 if (txn != null) { 117 try { 118 txn.abort(); 119 } catch (Exception e) { 120 System.err.println("Error aborting transaction: " + 121 e.toString()); 122 e.printStackTrace(); 123 } 124 } 125 } 126 } 127 } 128 } 129 130 private int countRecords(Transaction txn) throws DatabaseException { 146 DatabaseEntry key = new DatabaseEntry(); 147 DatabaseEntry data = new DatabaseEntry(); 148 int count = 0; 149 Cursor cursor = null; 150 151 try { 152 CursorConfig cc = new CursorConfig(); 154 cc.setReadUncommitted(true); 155 cursor = myDb.openCursor(txn, cc); 156 while (cursor.getNext(key, data, LockMode.DEFAULT) == 157 OperationStatus.SUCCESS) { 158 159 count++; 160 } 161 } finally { 162 if (cursor != null) { 163 cursor.close(); 164 } 165 } 166 167 return count; 168 169 } 170 } 171 | Popular Tags |