1 package org.jacorb.concurrency; 2 3 4 5 42 43 44 45 import org.omg.CosConcurrencyControl.*; 46 47 import org.omg.PortableServer.POA ; 48 49 import org.omg.CosTransactions.*; 50 51 import java.util.*; 52 53 54 class TransactionalLockSetImpl extends TransactionalLockSetPOA { 55 56 private Hashtable locks = new Hashtable(); 57 58 private Vector queue = new Vector(); 59 60 private Vector related = new Vector(); 61 62 private LockSetFactoryImpl factory; 63 64 private boolean is_active = true; 65 66 TransactionalLockSetImpl( LockSetFactoryImpl factory ) { 67 68 this.factory = factory; 69 70 }; 71 72 public void lock( Coordinator current, lock_mode mode ) { 73 74 synchronized( queue ){ 75 76 check_active(); 77 78 TransactionCoordinator tc = factory.get_transaction_coordinator(current); 79 80 Request rqst = null; 81 82 synchronized( tc ){ 83 84 check_status(tc); 85 86 if(attempt_lock( tc, mode )){ 87 88 return; 89 90 } 91 92 rqst = new Request(); 93 94 rqst.state = LockSetFactoryImpl.REQUEST; 95 96 rqst.current = tc; 97 98 rqst.to_do = Request.LOCK; 99 100 rqst.set_mode = mode; 101 102 rqst.reset_mode = null; 103 104 queue.addElement( rqst ); 105 106 tc.set_lock_coordinator( this ); 107 108 } 109 110 while( rqst.state == LockSetFactoryImpl.REQUEST ){ 111 112 try { 113 114 queue.wait(); 115 116 } catch ( Exception e ){ 117 118 e.printStackTrace( System.out ); 119 120 throw new org.omg.CORBA.INTERNAL (); 121 122 } 123 124 }; 125 126 switch( rqst.state ){ 127 128 case LockSetFactoryImpl.COMMIT : 129 130 case LockSetFactoryImpl.NO_TRANS : 131 132 throw new org.omg.CORBA.INVALID_TRANSACTION (); 133 134 case LockSetFactoryImpl.ROLLBACK : 135 136 throw new org.omg.CORBA.TRANSACTION_ROLLEDBACK (); 137 138 } 139 140 } 141 142 }; 143 144 public boolean try_lock( Coordinator current, lock_mode mode ) { 145 146 synchronized( queue ){ 147 148 check_active(); 149 150 TransactionCoordinator tc = factory.get_transaction_coordinator(current); 151 152 synchronized( tc ){ 153 154 check_status(tc); 155 156 return attempt_lock( tc, mode ); 157 158 } 159 160 } 161 162 }; 163 164 public void unlock( Coordinator current, lock_mode mode ) throws LockNotHeld { 165 166 synchronized( queue ){ 167 168 check_active(); 169 170 TransactionCoordinator tc = factory.get_transaction_coordinator(current); 171 172 synchronized( tc ){ 173 174 check_status(tc); 175 176 TransactionLocks current_locks = (TransactionLocks)locks.get( tc ); 177 178 if( current_locks == null ){ 179 180 throw new LockNotHeld(); 181 182 } 183 184 current_locks.unlock(mode); 185 186 } 187 188 if( attempt_lock_from_queue() ){ 189 190 queue.notifyAll(); 191 192 }; 193 194 } 195 196 }; 197 198 public void change_mode( Coordinator current, lock_mode held_mode, lock_mode new_mode ) throws LockNotHeld { 199 200 synchronized( queue ){ 201 202 check_active(); 203 204 TransactionCoordinator tc = factory.get_transaction_coordinator(current); 205 206 Request rqst = null; 207 208 synchronized( tc ){ 209 210 check_status(tc); 211 212 if(attempt_change( tc, new_mode, held_mode )){ 213 214 return; 215 216 } 217 218 rqst = new Request(); 219 220 rqst.state = LockSetFactoryImpl.REQUEST; 221 222 rqst.current = tc; 223 224 rqst.to_do = Request.CHANGE; 225 226 rqst.set_mode = new_mode; 227 228 rqst.reset_mode = held_mode; 229 230 queue.addElement( rqst ); 231 232 tc.get_lock_coordinator( this ); 233 234 } 235 236 while( rqst.state == LockSetFactoryImpl.REQUEST ){ 237 238 try { 239 240 queue.wait(); 241 242 } catch ( Exception e ){ 243 244 e.printStackTrace( System.out ); 245 246 throw new org.omg.CORBA.INTERNAL (); 247 248 } 249 250 }; 251 252 switch( rqst.state ){ 253 254 case LockSetFactoryImpl.COMMIT : 255 256 case LockSetFactoryImpl.NO_TRANS : 257 258 throw new org.omg.CORBA.INVALID_TRANSACTION (); 259 260 case LockSetFactoryImpl.ROLLBACK : 261 262 throw new org.omg.CORBA.TRANSACTION_ROLLEDBACK (); 263 264 case LockSetFactoryImpl.REJECT : 265 266 throw new LockNotHeld(); 267 268 } 269 270 if( attempt_lock_from_queue() ){ 271 272 queue.notifyAll(); 273 274 }; 275 276 } 277 278 }; 279 280 public LockCoordinator get_coordinator( Coordinator which ) { 281 282 TransactionCoordinator tc = factory.get_transaction_coordinator( which ); 283 284 return tc.get_lock_coordinator( this ); 285 286 }; 287 288 synchronized void add_related( TransactionalLockSet which ){ 289 290 related.addElement( which ); 291 292 }; 293 294 synchronized boolean attempt_lock( TransactionCoordinator tc, lock_mode mode ){ 295 296 Enumeration enumeration = locks.elements(); 297 298 TransactionLocks current_transaction_locks = null; 299 300 while( enumeration.hasMoreElements() ){ 301 302 TransactionLocks lock = (TransactionLocks) enumeration.nextElement(); 303 304 if( lock.current == tc ){ 305 306 current_transaction_locks = lock; 307 308 continue; 309 310 } 311 312 if( !lock.no_conflict( mode ) ){ 313 314 return false; 315 316 } 317 318 }; 319 320 if( current_transaction_locks == null ){ 321 322 current_transaction_locks = new TransactionLocks( tc ); 323 324 tc.get_lock_coordinator( this ); 325 326 locks.put( tc, current_transaction_locks ); 327 328 } 329 330 current_transaction_locks.lock( mode ); 331 332 return true; 333 334 }; 335 336 private void check_status( TransactionCoordinator tc ){ 337 338 Status status = tc.get_state(); 339 340 if( status.equals( Status.StatusActive ) ){ 341 342 return; 343 344 } else if( status.equals( Status.StatusPrepared ) || 345 346 status.equals( Status.StatusCommitted ) || 347 348 status.equals( Status.StatusUnknown ) || 349 350 status.equals( Status.StatusNoTransaction ) || 351 352 status.equals( Status.StatusPreparing ) || 353 354 status.equals( Status.StatusCommitting ) ) { 355 356 throw new org.omg.CORBA.INVALID_TRANSACTION (); 357 358 } else if (status.equals( Status.StatusRollingBack ) || 359 360 status.equals( Status.StatusMarkedRollback) || 361 362 status.equals( Status.StatusRolledBack) ) { 363 364 throw new org.omg.CORBA.TRANSACTION_ROLLEDBACK (); 365 366 } 367 368 } 369 370 private synchronized boolean attempt_lock_from_queue(){ 371 372 boolean rc = false; 373 374 boolean do_recursive = false; 375 376 Vector executed = new Vector(); 377 378 Enumeration enumeration = queue.elements(); 379 380 while( enumeration.hasMoreElements() ){ 381 382 Request r = (Request)enumeration.nextElement(); 383 384 synchronized( r.current ){ 385 386 try { 387 388 check_status( r.current ); 389 390 } catch ( org.omg.CORBA.INVALID_TRANSACTION e ) { 391 392 r.state = LockSetFactoryImpl.NO_TRANS; 393 394 executed.addElement( r ); 395 396 rc = true; 397 398 continue; 399 400 } catch ( org.omg.CORBA.TRANSACTION_ROLLEDBACK e ) { 401 402 r.state = LockSetFactoryImpl.ROLLBACK; 403 404 executed.addElement( r ); 405 406 rc = true; 407 408 continue; 409 410 } 411 412 switch( r.to_do ) { 413 414 case Request.LOCK: 415 416 if( !attempt_lock( r.current, r.set_mode ) ) { 417 418 continue; 419 420 } 421 422 r.state = LockSetFactoryImpl.SATISFIED; 423 424 break; 425 426 case Request.CHANGE: 427 428 try { 429 430 if( !attempt_change( r.current, r.set_mode, r.reset_mode ) ) { 431 432 continue; 433 434 } 435 436 r.state = LockSetFactoryImpl.SATISFIED; 437 438 do_recursive = true; 439 440 } catch ( LockNotHeld e ) { 441 442 r.state = LockSetFactoryImpl.REJECT; 443 444 } 445 446 break; 447 448 } 449 450 executed.addElement( r ); 451 452 rc = true; 453 454 } 455 456 }; 457 458 enumeration = executed.elements(); 459 460 while( enumeration.hasMoreElements() ){ 461 462 queue.removeElement( enumeration.nextElement() ); 463 464 } 465 466 if( do_recursive ){ 467 468 attempt_lock_from_queue(); 469 470 } 471 472 return executed.size() > 0; 473 474 }; 475 476 private synchronized boolean attempt_change( TransactionCoordinator tc, lock_mode set_mode, lock_mode reset_mode ) throws LockNotHeld { 477 478 TransactionLocks current_locks = (TransactionLocks)locks.get( tc ); 479 480 if( current_locks == null || !current_locks.is_held( reset_mode ) ){ 481 482 throw new LockNotHeld(); 483 484 } 485 486 if( attempt_lock( tc, set_mode ) ){ 487 488 current_locks.unlock( reset_mode ); 489 490 return true; 491 492 } 493 494 return false; 495 496 }; 497 498 synchronized void transaction_finished( TransactionCoordinator tc ){ 499 500 Vector executed = new Vector(); 501 502 Enumeration enumeration; 503 504 boolean do_notify = false; 505 506 synchronized( queue ){ 507 508 enumeration = queue.elements(); 509 510 while( enumeration.hasMoreElements() ){ 511 512 Request r = (Request)enumeration.nextElement(); 513 514 if( r.current == tc ){ 515 516 r.state = LockSetFactoryImpl.ROLLBACK; 517 518 executed.addElement( r ); 519 520 } 521 522 } 523 524 if( executed.size() > 0 ) { 525 526 enumeration = executed.elements(); 527 528 while( enumeration.hasMoreElements() ){ 529 530 queue.removeElement( enumeration.nextElement() ); 531 532 } 533 534 do_notify = true; 535 536 } 537 538 if( locks.remove( tc ) != null ) { 539 540 do_notify = attempt_lock_from_queue()?true:do_notify; 541 542 } 543 544 if( do_notify ) { 545 546 queue.notifyAll(); 547 548 } 549 550 } 551 552 if( related.size() > 0 ) { 553 554 enumeration = related.elements(); 555 556 while( enumeration.hasMoreElements() ){ 557 558 TransactionalLockSet ls = (TransactionalLockSet)enumeration.nextElement(); 559 560 ls.get_coordinator( tc.get_coordinator() ).drop_locks(); 561 562 } 563 564 } 565 566 }; 567 568 public void print(){ 569 570 Enumeration enumeration; 571 572 System.out.println("\n============================================================================="); 573 574 System.out.println(" LOCKS"+locks.size() ); 575 576 System.out.println("-----------------------------------------------------------------------------"); 577 578 synchronized ( queue ) { 579 580 enumeration = locks.elements(); 581 582 while( enumeration.hasMoreElements() ){ 583 584 TransactionLocks r = (TransactionLocks)enumeration.nextElement(); 585 586 System.out.println( r.toString() ); 587 588 } 589 590 System.out.println("\n-----------------------------------------------------------------------------"); 591 592 System.out.println(" QUEUE"+queue.size() ); 593 594 System.out.println("-----------------------------------------------------------------------------"); 595 596 enumeration = queue.elements(); 597 598 while( enumeration.hasMoreElements() ){ 599 600 Request r = (Request)enumeration.nextElement(); 601 602 System.out.println( r.toString() ); 603 604 } 605 606 }; 607 608 System.out.println("=============================================================================\n"); 609 610 }; 611 612 614 public void destroy() { 615 616 synchronized( queue ){ 617 618 check_active(); 619 620 is_active = false; 621 622 if( locks.size() > 0 ){ 623 624 Enumeration enumeration = locks.elements(); 625 626 while( enumeration.hasMoreElements() ){ 627 628 TransactionLocks ls = (TransactionLocks)enumeration.nextElement(); 629 630 if( ls.any_locks() ){ 631 632 throw new RuntimeException ("LockExists"); 633 634 } 635 636 } 637 638 } 639 640 factory.remove_me( this ); 641 642 }; 643 644 }; 645 646 private void check_active(){ 647 648 if( !is_active ){ 649 650 throw new org.omg.CORBA.OBJECT_NOT_EXIST (); 651 652 } 653 654 } 655 656 }; 657 658 659 660 661 662 663 664 665 666 667 668 669 670 | Popular Tags |