1 24 25 package org.objectweb.cjdbc.controller.scheduler.raidb2; 26 27 import java.sql.SQLException ; 28 29 import org.objectweb.cjdbc.common.exceptions.RollbackException; 30 import org.objectweb.cjdbc.common.sql.AbstractRequest; 31 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 32 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 33 import org.objectweb.cjdbc.common.sql.SelectRequest; 34 import org.objectweb.cjdbc.common.sql.StoredProcedure; 35 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 36 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 37 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler; 38 39 50 public class RAIDb2QueryLevelScheduler extends AbstractScheduler 51 { 52 53 63 private long requestId; 64 private int pendingReads; 65 66 private Object readSync; private Object writeSync; 71 75 78 public RAIDb2QueryLevelScheduler() 79 { 80 super(RAIDbLevels.RAIDb2, ParsingGranularities.NO_PARSING); 81 requestId = 0; 82 pendingReads = 0; 83 readSync = new Object (); 84 writeSync = new Object (); 85 } 86 87 91 94 public void scheduleReadRequest(SelectRequest request) throws SQLException 95 { 96 synchronized (this.writeSync) 98 { 99 if (getPendingWrites() == 0) 100 { synchronized (this.readSync) 102 { 103 request.setId(requestId++); 104 pendingReads++; 105 if (logger.isDebugEnabled()) 106 logger.debug("Request " + request.getId() + " scheduled for read (" 107 + pendingReads + " pending reads)"); 108 return; 109 } 110 } 111 112 try 114 { 115 if (logger.isDebugEnabled()) 116 logger.debug("Request " + request.getId() + " waiting for " 117 + getPendingWrites() + " pending writes)"); 118 119 int timeout = request.getTimeout(); 120 if (timeout > 0) 121 { 122 long start = System.currentTimeMillis(); 123 long lTimeout = timeout * 1000; 125 this.writeSync.wait(lTimeout); 126 long end = System.currentTimeMillis(); 127 int remaining = (int) (lTimeout - (end - start)); 128 if (remaining > 0) 129 request.setTimeout(remaining); 130 else 131 { 132 String msg = "Timeout (" + request.getTimeout() + ") for request: " 133 + request.getId(); 134 logger.warn(msg); 135 throw new SQLException (msg); 136 } 137 } 138 else 139 this.writeSync.wait(); 140 141 synchronized (this.readSync) 142 { 143 request.setId(requestId++); 144 pendingReads++; 145 if (logger.isDebugEnabled()) 146 logger.debug("Request " + request.getId() + " scheduled for read (" 147 + pendingReads + " pending reads)"); 148 return; } 150 } 151 catch (InterruptedException e) 152 { 153 if (logger.isWarnEnabled()) 155 logger.warn("Request " + request.getId() + " timed out (" 156 + request.getTimeout() + " s)"); 157 throw new SQLException ("Timeout (" + request.getTimeout() 158 + ") for request: " + request.getId()); 159 } 160 } 161 } 162 163 166 public final void readCompletedNotify(SelectRequest request) 167 { 168 synchronized (this.readSync) 169 { 170 pendingReads--; 171 if (logger.isDebugEnabled()) 172 logger.debug("Request " + request.getId() + " completed"); 173 if (pendingReads == 0) 174 { 175 if (logger.isDebugEnabled()) 176 logger.debug("Last read completed, notifying writes"); 177 readSync.notifyAll(); } 179 } 180 } 181 182 185 public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request) 186 throws SQLException 187 { 188 synchronized (this.writeSync) 191 { 192 synchronized (this.readSync) 193 { 194 if (pendingReads == 0) 195 { request.setId(requestId++); 197 if (logger.isDebugEnabled()) 198 logger.debug("Request " + request.getId() 199 + " scheduled for write (" + getPendingWrites() 200 + " pending writes)"); 201 return; 202 } 203 } 204 } 205 206 waitForReadCompletion(request); 207 scheduleNonSuspendedWriteRequest(request); 208 } 209 210 213 public final synchronized void notifyWriteCompleted( 214 AbstractWriteRequest request) 215 { 216 synchronized (this.writeSync) 217 { 218 if (logger.isDebugEnabled()) 219 logger.debug("Request " + request.getId() + " completed"); 220 if (getPendingWrites() == 0) 221 { 222 if (logger.isDebugEnabled()) 223 logger.debug("Last write completed, notifying reads"); 224 writeSync.notifyAll(); } 226 } 227 } 228 229 232 public final synchronized void scheduleNonSuspendedStoredProcedure( 233 StoredProcedure proc) throws SQLException , RollbackException 234 { 235 synchronized (this.writeSync) 238 { 239 synchronized (this.readSync) 240 { 241 if (pendingReads == 0) 242 { proc.setId(requestId++); 244 if (logger.isDebugEnabled()) 245 logger.debug("Stored procedure " 246 + proc.getId() 247 + (proc.isAutoCommit() ? "" : " transaction " 248 + proc.getTransactionId()) + " scheduled for write (" 249 + getPendingWrites() + " pending writes)"); 250 return; 251 } 252 } 253 } 254 255 waitForReadCompletion(proc); 256 scheduleNonSuspendedStoredProcedure(proc); 257 } 258 259 262 public final void notifyStoredProcedureCompleted(StoredProcedure proc) 263 { 264 synchronized (this.writeSync) 265 { 266 if (logger.isDebugEnabled()) 267 logger.debug("Stored procedure " + proc.getId() + " completed - " 268 + getPendingWrites() + " pending writes"); 269 if (getPendingWrites() == 0) 270 { 271 if (logger.isDebugEnabled()) 272 logger.debug("Last write completed, notifying reads"); 273 writeSync.notifyAll(); } 275 } 276 } 277 278 284 private void waitForReadCompletion(AbstractRequest request) 285 throws SQLException 286 { 287 synchronized (this.readSync) 288 { 289 try 291 { 292 if (logger.isDebugEnabled()) 293 logger.debug("Request " + request.getId() + " waiting for " 294 + pendingReads + " pending reads)"); 295 296 int timeout = request.getTimeout(); 297 if (timeout > 0) 298 { 299 long start = System.currentTimeMillis(); 300 long lTimeout = timeout * 1000; 302 this.readSync.wait(lTimeout); 303 long end = System.currentTimeMillis(); 304 int remaining = (int) (lTimeout - (end - start)); 305 if (remaining > 0) 306 request.setTimeout(remaining); 307 else 308 { 309 String msg = "Timeout (" + request.getTimeout() + ") for request: " 310 + request.getId(); 311 logger.warn(msg); 312 throw new SQLException (msg); 313 } 314 } 315 else 316 this.readSync.wait(); 317 } 318 catch (InterruptedException e) 319 { 320 if (logger.isWarnEnabled()) 322 logger.warn("Request " + request.getId() + " timed out (" 323 + request.getTimeout() + " ms)"); 324 throw new SQLException ("Timeout (" + request.getTimeout() 325 + ") for request: " + request.getId()); 326 } 327 } 328 } 329 330 334 337 protected final void commitTransaction(long transactionId) 338 { 339 } 340 341 344 protected final void rollbackTransaction(long transactionId) 345 { 346 } 347 348 352 protected final void rollbackTransaction(long transactionId, 353 String savepointName) 354 { 355 } 356 357 361 protected final void setSavepointTransaction(long transactionId, String name) 362 { 363 } 364 365 369 protected final void releaseSavepointTransaction(long transactionId, 370 String name) 371 { 372 } 373 374 380 public String getXmlImpl() 381 { 382 StringBuffer info = new StringBuffer (); 383 info.append("<" + DatabasesXmlTags.ELT_RAIDb2Scheduler + " " 384 + DatabasesXmlTags.ATT_level + "=\"" + DatabasesXmlTags.VAL_query 385 + "\"/>"); 386 info.append(System.getProperty("line.separator")); 387 return info.toString(); 388 } 389 } 390 | Popular Tags |