1 24 25 package org.objectweb.cjdbc.controller.scheduler.raidb1; 26 27 import java.sql.SQLException ; 28 import java.util.ArrayList ; 29 30 import org.objectweb.cjdbc.common.exceptions.RollbackException; 31 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 32 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 33 import org.objectweb.cjdbc.common.sql.SelectRequest; 34 import org.objectweb.cjdbc.common.sql.StoredProcedure; 35 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema; 36 import org.objectweb.cjdbc.common.sql.schema.DatabaseTable; 37 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 38 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 39 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler; 40 import org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseSchema; 41 import org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseTable; 42 import org.objectweb.cjdbc.controller.scheduler.schema.TransactionExclusiveLock; 43 44 57 public class RAIDb1OptimisticTransactionLevelScheduler 58 extends AbstractScheduler 59 { 60 61 71 private long requestId; 72 private SchedulerDatabaseSchema schedulerDatabaseSchema = null; 73 74 78 81 public RAIDb1OptimisticTransactionLevelScheduler() 82 { 83 super(RAIDbLevels.RAIDb1, ParsingGranularities.TABLE); 84 requestId = 0; 85 } 86 87 91 99 public synchronized void setDatabaseSchema(DatabaseSchema dbs) 100 { 101 if (schedulerDatabaseSchema == null) 102 { 103 logger.info("Setting new database schema"); 104 schedulerDatabaseSchema = new SchedulerDatabaseSchema(dbs); 105 } 106 else 107 { SchedulerDatabaseSchema newSchema = new SchedulerDatabaseSchema(dbs); 109 ArrayList tables = schedulerDatabaseSchema.getTables(); 110 ArrayList newTables = newSchema.getTables(); 111 if (newTables == null) 112 { logger.info("Removing all tables."); 114 schedulerDatabaseSchema = null; 115 return; 116 } 117 118 for (int i = 0; i < tables.size(); i++) 120 { 121 SchedulerDatabaseTable t = (SchedulerDatabaseTable) tables.get(i); 122 if (!newSchema.hasTable(t.getName())) 123 { 124 schedulerDatabaseSchema.removeTable(t); 125 if (logger.isInfoEnabled()) 126 logger.info("Removing table " + t.getName()); 127 } 128 } 129 130 int size = newTables.size(); 132 for (int i = 0; i < size; i++) 133 { 134 SchedulerDatabaseTable t = (SchedulerDatabaseTable) newTables.get(i); 135 if (!schedulerDatabaseSchema.hasTable(t.getName())) 136 { 137 schedulerDatabaseSchema.addTable(t); 138 if (logger.isInfoEnabled()) 139 logger.info("Adding table " + t.getName()); 140 } 141 } 142 } 143 } 144 145 151 public void mergeDatabaseSchema(DatabaseSchema dbs) 152 { 153 try 154 { 155 logger.info("Merging new database schema"); 156 schedulerDatabaseSchema.mergeSchema(new SchedulerDatabaseSchema(dbs)); 157 } 158 catch (Exception e) 159 { 160 logger.error("Error while merging new database schema", e); 161 } 162 } 163 164 170 public final void scheduleReadRequest(SelectRequest request) 171 throws SQLException 172 { 173 synchronized (this) 174 { 175 request.setId(requestId++); 176 } 177 } 178 179 182 public final void readCompletedNotify(SelectRequest request) 183 { 184 } 185 186 192 public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request) 193 throws SQLException , RollbackException 194 { 195 if (request.isCreate()) 196 { 197 synchronized (this) 198 { 199 request.setId(requestId++); 200 } 201 return; 202 } 203 204 SchedulerDatabaseTable t = schedulerDatabaseSchema.getTable(request 205 .getTableName()); 206 if (t == null) 207 { 208 String msg = "No table found for request " + request.getId(); 209 logger.error(msg); 210 throw new SQLException (msg); 211 } 212 213 TransactionExclusiveLock tableLock = t.getLock(); 215 if (!request.isAutoCommit()) 216 { 217 synchronized (this) 218 { 219 if (tableLock.isLocked()) 220 { long owner = tableLock.getLocker(); 222 long us = request.getTransactionId(); 223 if (owner != us) 224 { ArrayList tables = schedulerDatabaseSchema.getTables(); 226 ArrayList weAreblocking = new ArrayList (); 227 int size = tables.size(); 228 for (int i = 0; i < size; i++) 229 { 230 SchedulerDatabaseTable table = (SchedulerDatabaseTable) tables 231 .get(i); 232 if (table == null) 233 continue; 234 TransactionExclusiveLock lock = table.getLock(); 235 if (lock.isLocked()) 237 { 238 if (lock.getLocker() == us) 239 { 240 if (lock.isWaiting(owner)) 242 { releaseLocks(us); 244 throw new RollbackException( 245 "Deadlock detected, rollbacking transaction " + us); 246 } 247 else 248 weAreblocking.addAll(lock.getWaitingList()); 249 } 250 } 251 } 252 } 253 else 254 { request.setId(requestId++); 257 return; 258 } 259 } 260 else 261 { acquireLockAndSetRequestId(request, tableLock); 263 return; 264 } 265 } 266 } 267 268 acquireLockAndSetRequestId(request, tableLock); 269 } 270 271 private void acquireLockAndSetRequestId(AbstractWriteRequest request, 272 TransactionExclusiveLock tableLock) throws SQLException 273 { 274 if (tableLock.acquire(request)) 276 { 277 synchronized (this) 278 { 279 request.setId(requestId++); 280 } 281 if (logger.isDebugEnabled()) 282 logger.debug("Request " + request.getId() + " scheduled for write (" 283 + getPendingWrites() + " pending writes)"); 284 } 285 else 286 { 287 if (logger.isWarnEnabled()) 288 logger.warn("Request " + request.getId() + " timed out (" 289 + request.getTimeout() + " ms)"); 290 throw new SQLException ("Timeout (" + request.getTimeout() 291 + ") for request: " + request.getId()); 292 } 293 } 294 295 298 public final void notifyWriteCompleted(AbstractWriteRequest request) 299 { 300 if (request.isCreate()) 301 { if (logger.isDebugEnabled()) 303 logger.debug("Adding table '" + request.getTableName() 304 + "' to scheduler schema"); 305 synchronized (this) 306 { 307 schedulerDatabaseSchema.addTable(new SchedulerDatabaseTable( 308 new DatabaseTable(request.getTableName()))); 309 } 310 } 311 else if (request.isDrop()) 312 { if (logger.isDebugEnabled()) 314 logger.debug("Removing table '" + request.getTableName() 315 + "' to scheduler schema"); 316 synchronized (this) 317 { 318 schedulerDatabaseSchema.removeTable(schedulerDatabaseSchema 319 .getTable(request.getTableName())); 320 } 321 return; 322 } 323 324 if (request.isAutoCommit()) 327 { 328 SchedulerDatabaseTable t = schedulerDatabaseSchema.getTable(request 329 .getTableName()); 330 if (t == null) 331 { 332 String msg = "No table found to release lock for request " 333 + request.getId(); 334 logger.error(msg); 335 } 336 else 337 t.getLock().release(); 338 } 339 } 340 341 344 public void scheduleNonSuspendedStoredProcedure(StoredProcedure proc) 345 throws SQLException , RollbackException 346 { 347 throw new SQLException ( 348 "Stored procedures are not supported by the RAIDb-1 optimistic transaction level scheduler."); 349 } 350 351 354 public void notifyStoredProcedureCompleted(StoredProcedure proc) 355 { 356 throw new RuntimeException ( 359 "Stored procedures are not supported by the RAIDb-1 optimistic transaction level scheduler."); 360 } 361 362 366 369 protected final void commitTransaction(long transactionId) 370 { 371 releaseLocks(transactionId); 372 } 373 374 377 protected final void rollbackTransaction(long transactionId) 378 { 379 releaseLocks(transactionId); 380 } 381 382 386 protected final void rollbackTransaction(long transactionId, 387 String savepointName) 388 { 389 } 390 391 395 protected final void setSavepointTransaction(long transactionId, String name) 396 { 397 } 398 399 403 protected final void releaseSavepointTransaction(long transactionId, 404 String name) 405 { 406 } 407 408 413 private synchronized void releaseLocks(long transactionId) 414 { 415 ArrayList tables = schedulerDatabaseSchema.getTables(); 416 int size = tables.size(); 417 for (int i = 0; i < size; i++) 418 { 419 SchedulerDatabaseTable t = (SchedulerDatabaseTable) tables.get(i); 420 if (t == null) 421 continue; 422 TransactionExclusiveLock lock = t.getLock(); 423 if (lock.isLocked()) 425 if (lock.getLocker() == transactionId) 426 lock.release(); 427 } 428 } 429 430 434 437 public String getXmlImpl() 438 { 439 return "<" + DatabasesXmlTags.ELT_RAIDb1Scheduler + " " 440 + DatabasesXmlTags.ATT_level + "=\"" 441 + DatabasesXmlTags.VAL_optimisticTransaction + "\"/>"; 442 } 443 } 444 | Popular Tags |