1 package com.daffodilwoods.daffodildb.server.datasystem.utility; 2 3 import java.util.*; 4 public class ReentrantReadWriteLocker implements ReadWriteLock { 5 boolean tableLocked= false; 6 TableLock tableLock = new TableLock(); 7 Map map = Collections.synchronizedMap(new HashMap()); 8 Object monitor = new Object (); 9 10 private class TableLock{ 11 Object owner; 12 int waitingThreads=0; 13 14 synchronized void incrementWaiters(){ 15 ++waitingThreads; 16 } 17 18 synchronized void decrementWaitersToZero(){ 19 waitingThreads = 0; 20 } 21 22 synchronized int waitersCount(){ 23 return waitingThreads; 24 } 25 } 26 27 private class RowLock{ 28 Object owner; private Map readOwner = new HashMap(); 30 private int waitingThreads; 31 32 synchronized void increment(){ 33 Thread current = Thread.currentThread(); 34 Integer i = (Integer )readOwner.get(current); 35 readOwner.put(current,new Integer (i == null ? 1 : i.intValue() +1 ) ); 36 } 37 38 synchronized void decrement(){ 39 Thread current = Thread.currentThread(); 40 Integer i = (Integer )readOwner.get(current); 41 if( i.intValue() == 1 ) 42 readOwner.remove(current); 43 else 44 readOwner.put(current,new Integer (i.intValue()-1)); 45 } 46 47 synchronized boolean allowReader(){ 48 return readOwner.containsKey(Thread.currentThread()); 49 } 50 51 synchronized int readersCount(){ 52 return readOwner.size(); 53 } 54 55 synchronized void incrementWaiters(){ 56 ++waitingThreads; 57 } 58 59 synchronized void decrementWaitersToZero(){ 60 waitingThreads = 0; 61 } 62 63 synchronized int waitingThreads(){ 64 return waitingThreads; 65 } 66 67 } 68 69 public ReentrantReadWriteLocker() { 70 } 71 72 private void waitOnTableLock(){ 73 synchronized (tableLock ){ 74 try{ 75 tableLock.wait(5); 76 }catch(InterruptedException ie ){ 77 } 78 } 79 } 80 81 private void notifyOnTableLock(){ 82 synchronized (tableLock){ 83 tableLock.notifyAll(); 84 } 85 } 86 87 private void waitOnRowLock(RowLock rw){ 88 synchronized( rw ){ 89 try{ 90 rw.wait(5); 91 }catch(InterruptedException ie ){ 92 } 93 } 94 } 95 96 private void notifyOnRowLock(RowLock rw){ 97 synchronized( rw ){ 98 rw.notifyAll(); 99 } 100 } 101 102 public void lockTable(){ 103 for(;;){ 104 if( tableLocked ){ 105 waitOnTableLock(); 106 } 107 else if ( map.size() > 0 ) 108 { 109 tableLock.incrementWaiters(); 110 waitOnTableLock(); 111 } 112 else{ 113 synchronized( monitor ){ 114 if( map.size() == 0 ){ 115 tableLocked = true; 116 tableLock.decrementWaitersToZero(); 117 tableLock.owner = Thread.currentThread(); 118 return; 119 } 120 } 121 } 122 } 123 } 124 125 public void releaseTable(){ 126 synchronized( monitor ) { 127 tableLock.owner = null; 128 tableLocked = false; 129 130 notifyOnTableLock(); 131 } 132 } 133 134 public void lockRowForRead(Object rowIdentity){ 135 for(;;){ 136 RowLock rw = (RowLock)map.get(rowIdentity); 137 if( rw != null ){ 138 if( rw.owner == Thread.currentThread() ){ 139 synchronized( monitor ){ 140 rw.increment(); 141 map.put(rowIdentity,rw); 142 return; 143 } 144 } 145 else if ( rw.allowReader() ){ 146 synchronized( monitor ){ 147 rw.increment(); 148 map.put(rowIdentity,rw); 149 return; 150 } 151 } 152 else if ( tableLock.waitersCount() > 0 ){ 153 waitOnTableLock(); 154 } 155 else if( rw.owner == null && rw.waitingThreads()==0){ 156 synchronized( monitor ){ 157 if( tableLocked == false ){ 158 rw = (RowLock)map.get(rowIdentity); 159 if(rw !=null && rw.owner == null && rw.waitingThreads()==0 ){ 160 rw.increment(); 161 map.put(rowIdentity,rw); 162 return; 163 } 164 } 165 } 166 } 167 else{ 168 rw.incrementWaiters(); 169 waitOnRowLock(rw); 170 } 171 } 172 else if ( tableLocked ){ 173 waitOnTableLock(); 174 } 175 else if ( tableLock.waitersCount() > 0 ){ 176 waitOnTableLock(); 177 } 178 else{ 179 synchronized( monitor ){ 180 if( tableLocked == false ){ 181 rw = (RowLock)map.get(rowIdentity); 182 rw = rw == null ? new RowLock() : rw; 183 if( rw.owner == null ){ 184 rw.increment(); 185 map.put(rowIdentity,rw); 186 return; 187 } 188 } 189 } 190 } 191 } 192 } 193 194 195 public void releaseRowForRead(Object rowIdentity){ 196 synchronized(monitor){ 197 RowLock rw = (RowLock) map.get(rowIdentity); 198 rw.decrement(); 199 200 if( rw.readersCount() == 0 ){ 201 map.remove(rowIdentity); 202 203 if ( tableLock.waitersCount() > 0 ) 204 notifyOnTableLock(); 205 206 if( rw.waitingThreads() > 0 ) 207 notifyOnRowLock(rw); 208 } 209 } 210 } 211 212 213 public void lockRowForWrite(Object rowIdentity){ 214 for(;;){ 215 RowLock rw = (RowLock) map.get(rowIdentity); 216 if( rw != null ){ 217 if( tableLock.waitersCount() > 0){ 218 waitOnTableLock(); 219 } 220 else{ 221 rw.incrementWaiters(); 222 waitOnRowLock(rw); 223 } 224 } 225 else if ( tableLocked ){ 226 waitOnTableLock(); 227 } 228 else if ( tableLock.waitersCount() > 0 ){ 229 waitOnTableLock(); 230 } 231 else{ 232 synchronized( monitor ){ 233 if( tableLocked == false ){ 234 rw = (RowLock) map.get(rowIdentity); 235 rw = rw == null ? new RowLock() : rw; 236 if( rw.readersCount() == 0 ){ 237 rw.increment(); 238 rw.owner = Thread.currentThread(); 239 map.put(rowIdentity,rw); 240 return; 241 } 242 } 243 } 244 } 245 } 246 } 247 248 public void releaseRowForWrite(Object rowIdentity){ 249 synchronized( monitor ){ 250 RowLock rw = (RowLock) map.get(rowIdentity); 251 rw.decrement(); 252 253 map.remove(rowIdentity); 254 255 if( tableLock.waitersCount() > 0 ){ 256 notifyOnTableLock(); 257 } 258 259 if ( rw.waitingThreads() > 0 ) 260 notifyOnRowLock(rw); 261 } 262 } 263 } 264 | Popular Tags |