1 24 25 package org.objectweb.cjdbc.controller.scheduler.raidb1; 26 27 import java.sql.SQLException ; 28 29 import org.objectweb.cjdbc.common.exceptions.RollbackException; 30 import org.objectweb.cjdbc.common.sql.AbstractRequest; 31 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 32 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 33 import org.objectweb.cjdbc.common.sql.SelectRequest; 34 import org.objectweb.cjdbc.common.sql.StoredProcedure; 35 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 36 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 37 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler; 38 39 50 public class RAIDb1QueryLevelScheduler extends AbstractScheduler 51 { 52 53 63 private long requestId; 64 private int pendingReads; 65 66 private Object readSync; private Object writeSync; 71 75 78 public RAIDb1QueryLevelScheduler() 79 { 80 super(RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING); 81 requestId = 0; 82 pendingReads = 0; 83 readSync = new Object (); 84 writeSync = new Object (); 85 } 86 87 91 94 public void scheduleReadRequest(SelectRequest request) throws SQLException 95 { 96 synchronized (this.writeSync) 98 { 99 if (getPendingWrites() == 0) 100 { synchronized (this.readSync) 102 { 103 request.setId(requestId++); 104 pendingReads++; 105 if (logger.isDebugEnabled()) 106 logger.debug("Request " 107 + request.getId() 108 + (request.isAutoCommit() ? "" : " transaction " 109 + request.getTransactionId()) + " scheduled for read (" 110 + pendingReads + " pending reads)"); 111 return; 112 } 113 } 114 115 try 117 { 118 if (logger.isDebugEnabled()) 119 logger.debug("Request " + request.getId() + " waiting for " 120 + getPendingWrites() + " pending writes)"); 121 122 int timeout = request.getTimeout(); 123 if (timeout > 0) 124 { 125 long start = System.currentTimeMillis(); 126 long lTimeout = timeout * 1000; 128 this.writeSync.wait(lTimeout); 129 long end = System.currentTimeMillis(); 130 int remaining = (int) (lTimeout - (end - start)); 131 if (remaining > 0) 132 request.setTimeout(remaining); 133 else 134 { 135 String msg = "Timeout (" + request.getTimeout() + ") for request: " 136 + request.getId(); 137 logger.warn(msg); 138 throw new SQLException (msg); 139 } 140 } 141 else 142 this.writeSync.wait(); 143 144 synchronized (this.readSync) 145 { 146 request.setId(requestId++); 147 pendingReads++; 148 if (logger.isDebugEnabled()) 149 logger.debug("Request " + request.getId() + " scheduled for read (" 150 + pendingReads + " pending reads)"); 151 return; } 153 } 154 catch (InterruptedException e) 155 { 156 if (logger.isWarnEnabled()) 158 logger.warn("Request " + request.getId() + " timed out (" 159 + request.getTimeout() + " s)"); 160 throw new SQLException ("Timeout (" + request.getTimeout() 161 + ") for request: " + request.getId()); 162 } 163 } 164 } 165 166 169 public final void readCompletedNotify(SelectRequest request) 170 { 171 synchronized (this.readSync) 172 { 173 pendingReads--; 174 if (logger.isDebugEnabled()) 175 logger.debug("Read request " + request.getId() + " completed - " 176 + pendingReads + " pending reads"); 177 if (pendingReads == 0) 178 { 179 if (logger.isDebugEnabled()) 180 logger.debug("Last read completed, notifying writes"); 181 readSync.notifyAll(); } 183 } 184 } 185 186 189 public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request) 190 throws SQLException 191 { 192 synchronized (this.writeSync) 195 { 196 synchronized (this.readSync) 197 { 198 if (pendingReads == 0) 199 { request.setId(requestId++); 201 if (logger.isDebugEnabled()) 202 logger.debug("Request " 203 + request.getId() 204 + (request.isAutoCommit() ? "" : " transaction " 205 + request.getTransactionId()) + " scheduled for write (" 206 + getPendingWrites() + " pending writes)"); 207 return; 208 } 209 } 210 } 211 212 waitForReadCompletion(request); 213 scheduleNonSuspendedWriteRequest(request); 214 } 215 216 219 public final synchronized void notifyWriteCompleted( 220 AbstractWriteRequest request) 221 { 222 synchronized (this.writeSync) 223 { 224 if (logger.isDebugEnabled()) 225 logger.debug("Request " + request.getId() + " completed - " 226 + getPendingWrites() + " pending writes"); 227 if (getPendingWrites() == 0) 228 { 229 if (logger.isDebugEnabled()) 230 logger.debug("Last write completed, notifying reads"); 231 writeSync.notifyAll(); } 233 } 234 } 235 236 239 public final synchronized void scheduleNonSuspendedStoredProcedure( 240 StoredProcedure proc) throws SQLException , RollbackException 241 { 242 synchronized (this.writeSync) 245 { 246 synchronized (this.readSync) 247 { 248 if (pendingReads == 0) 249 { proc.setId(requestId++); 251 if (logger.isDebugEnabled()) 252 logger.debug("Stored procedure " 253 + proc.getId() 254 + (proc.isAutoCommit() ? "" : " transaction " 255 + proc.getTransactionId()) + " scheduled for write (" 256 + getPendingWrites() + " pending writes)"); 257 return; 258 } 259 } 260 } 261 262 waitForReadCompletion(proc); 263 scheduleNonSuspendedStoredProcedure(proc); 264 } 265 266 269 public final void notifyStoredProcedureCompleted(StoredProcedure proc) 270 { 271 synchronized (this.writeSync) 272 { 273 if (logger.isDebugEnabled()) 274 logger.debug("Stored procedure " + proc.getId() + " completed - " 275 + getPendingWrites() + " pending writes"); 276 if (getPendingWrites() == 0) 277 { 278 if (logger.isDebugEnabled()) 279 logger.debug("Last write completed, notifying reads"); 280 writeSync.notifyAll(); } 282 } 283 } 284 285 291 private void waitForReadCompletion(AbstractRequest request) 292 throws SQLException 293 { 294 synchronized (this.readSync) 295 { 296 try 298 { 299 if (logger.isDebugEnabled()) 300 logger.debug("Request " + request.getId() + " waiting for " 301 + pendingReads + " pending reads)"); 302 303 int timeout = request.getTimeout(); 304 if (timeout > 0) 305 { 306 long start = System.currentTimeMillis(); 307 long lTimeout = timeout * 1000; 309 this.readSync.wait(lTimeout); 310 long end = System.currentTimeMillis(); 311 int remaining = (int) (lTimeout - (end - start)); 312 if (remaining > 0) 313 request.setTimeout(remaining); 314 else 315 { 316 String msg = "Timeout (" + request.getTimeout() + ") for request: " 317 + request.getId(); 318 logger.warn(msg); 319 throw new SQLException (msg); 320 } 321 } 322 else 323 this.readSync.wait(); 324 } 325 catch (InterruptedException e) 326 { 327 if (logger.isWarnEnabled()) 329 logger.warn("Request " + request.getId() + " timed out (" 330 + request.getTimeout() + " ms)"); 331 throw new SQLException ("Timeout (" + request.getTimeout() 332 + ") for request: " + request.getId()); 333 } 334 } 335 } 336 337 341 344 protected final void commitTransaction(long transactionId) 345 { 346 } 347 348 351 protected final void rollbackTransaction(long transactionId) 352 { 353 } 354 355 359 protected final void rollbackTransaction(long transactionId, 360 String savepointName) 361 { 362 } 363 364 368 protected final void setSavepointTransaction(long transactionId, String name) 369 { 370 } 371 372 376 protected final void releaseSavepointTransaction(long transactionId, 377 String name) 378 { 379 } 380 381 387 public String getXmlImpl() 388 { 389 return "<" + DatabasesXmlTags.ELT_RAIDb1Scheduler + " " 390 + DatabasesXmlTags.ATT_level + "=\"" + DatabasesXmlTags.VAL_query 391 + "\"/>"; 392 } 393 } 394 | Popular Tags |